diff --git a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/Headers.java b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/Headers.java index 64678c8..770d708 100644 --- a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/Headers.java +++ b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/Headers.java @@ -1,5 +1,5 @@ /** - * Copyright 2017 The OpenTracing Authors + * Copyright 2017-2019 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,4 +20,5 @@ public final class Headers { public static final String MESSAGE_SENT_FROM_CLIENT = "messageSent"; public static final String MESSAGE_CONSUMED = "messageConsumed"; + static final String SCOPE_HEADER = OpenTracingChannelInterceptor.class.getName() + ".SCOPE"; } diff --git a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/MessageTextMap.java b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/MessageTextMap.java index 5af40aa..d276e86 100644 --- a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/MessageTextMap.java +++ b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/MessageTextMap.java @@ -63,6 +63,10 @@ public void put(String key, String value) { headers.put(key, byteHeaders.contains(key) ? value.getBytes() : value); } + public void addHeader(String key, Object value) { + headers.put(key, value); + } + public Message getMessage() { return MessageBuilder.fromMessage(message) .copyHeaders(headers) diff --git a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptor.java b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptor.java index b7fe9ff..1f605b8 100644 --- a/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptor.java +++ b/opentracing-spring-messaging/src/main/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptor.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2018 The OpenTracing Authors + * Copyright 2017-2019 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,6 +13,8 @@ */ package io.opentracing.contrib.spring.integration.messaging; +import static io.opentracing.contrib.spring.integration.messaging.Headers.SCOPE_HEADER; + import io.opentracing.References; import io.opentracing.Scope; import io.opentracing.Span; @@ -28,14 +30,13 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.ExecutorChannelInterceptor; import org.springframework.util.ClassUtils; /** * @author Gytis Trikleris */ -public class OpenTracingChannelInterceptor extends ChannelInterceptorAdapter implements ExecutorChannelInterceptor { +public class OpenTracingChannelInterceptor implements ExecutorChannelInterceptor { private static final Log log = LogFactory.getLog(OpenTracingChannelInterceptor.class); static final String COMPONENT_NAME = "spring-messaging"; @@ -66,8 +67,9 @@ public Message preSend(Message message, MessageChannel channel) { spanBuilder.asChildOf(extractedContext); } - Span span = spanBuilder.startActive(true).span(); - + Span span = spanBuilder.start(); + Scope scope = tracer.activateSpan(span); + carrier.addHeader(SCOPE_HEADER, scope); if (isConsumer) { log.trace("Adding 'messageConsumed' header"); carrier.put(Headers.MESSAGE_CONSUMED, "true"); @@ -76,6 +78,7 @@ public Message preSend(Message message, MessageChannel channel) { log.trace("Adding 'messageSent' header"); carrier.put(Headers.MESSAGE_SENT_FROM_CLIENT, "true"); } + log.trace(String.format("Pre-send: starting a new span %s , carrier extracted context %s", span, extractedContext)); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, carrier); return carrier.getMessage(); @@ -83,16 +86,19 @@ public Message preSend(Message message, MessageChannel channel) { @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { - Scope scope = tracer.scopeManager().active(); - if (scope == null) { - return; + Object scopeValue = message.getHeaders().get(SCOPE_HEADER); + if (scopeValue instanceof Scope) { + Span span = tracer.scopeManager().activeSpan(); + closeResources(ex, (Scope) scopeValue, span); } + } - log.trace(String.format("Completed sending and current span is %s", scope.span())); - handleException(ex, scope.span()); - log.trace("Closing messaging span scope " + scope); - scope.close(); - log.trace(String.format("Messaging span scope %s successfully closed", scope)); + private void closeResources(Exception ex, Scope scopeValue, Span span) { + handleException(ex, span); + span.finish(); + log.trace(String.format("Completed sending of current span %s", span)); + scopeValue.close(); + log.trace(String.format("Scope %s successfully closed", scopeValue)); } @Override diff --git a/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorIT.java b/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorIT.java index 4a35f7b..bfb2684 100644 --- a/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorIT.java +++ b/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorIT.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2018 The OpenTracing Authors + * Copyright 2017-2019 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -162,9 +162,11 @@ public void shouldCreateSpanWithExportedParent() { public void shouldCreateSpanWithActiveParent() { MockSpan parentSpan = mockTracer.buildSpan("http:testSendMessage") .start(); - try (Scope scope = mockTracer.scopeManager().activate(parentSpan, true)) { + try (Scope scope = mockTracer.scopeManager().activate(parentSpan)) { tracedChannel.send(MessageBuilder.withPayload("hi") .build()); + } finally { + parentSpan.finish(); } then(message).isNotNull(); diff --git a/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorTest.java b/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorTest.java index 1f894d4..a9b4b78 100644 --- a/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorTest.java +++ b/opentracing-spring-messaging/src/test/java/io/opentracing/contrib/spring/integration/messaging/OpenTracingChannelInterceptorTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2018 The OpenTracing Authors + * Copyright 2017-2019 The OpenTracing Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,6 +13,7 @@ */ package io.opentracing.contrib.spring.integration.messaging; +import static io.opentracing.contrib.spring.integration.messaging.Headers.SCOPE_HEADER; import static io.opentracing.contrib.spring.integration.messaging.OpenTracingChannelInterceptor.COMPONENT_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; @@ -78,11 +79,12 @@ public void before() { MockitoAnnotations.initMocks(this); when(mockTracer.buildSpan(anyString())).thenReturn(mockSpanBuilder); when(mockSpanBuilder.asChildOf(any(SpanContext.class))).thenReturn(mockSpanBuilder); - when(mockSpanBuilder.startActive(true)).thenReturn(mockScope); + when(mockSpanBuilder.start()).thenReturn(mockSpan); when(mockTracer.scopeManager()).thenReturn(mockScopeManager); - when(mockScope.span()).thenReturn(mockSpan); when(mockSpanBuilder.withTag(anyString(), anyString())).thenReturn(mockSpanBuilder); when(mockSpan.context()).thenReturn(mockSpanContext); + when(mockTracer.activateSpan(mockSpan)).thenReturn(mockScope); + when(mockScopeManager.activate(mockSpan)).thenReturn(mockScope); interceptor = new OpenTracingChannelInterceptor(mockTracer); simpleMessage = MessageBuilder.withPayload("test") @@ -108,7 +110,7 @@ public void preSendShouldStartSpanForClientSentMessage() { assertThat(message.getHeaders()).containsKey(Headers.MESSAGE_SENT_FROM_CLIENT); verify(mockTracer).buildSpan(String.format("send:%s", mockMessageChannel.toString())); - verify(mockSpanBuilder).startActive(true); + verify(mockSpanBuilder).start(); verify(mockSpanBuilder).withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME); verify(mockSpanBuilder).withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), mockMessageChannel.toString()); verify(mockSpanBuilder).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); @@ -125,7 +127,7 @@ public void preSendShouldStartSpanForServerReceivedMessage() { verify(mockTracer).extract(eq(Format.Builtin.TEXT_MAP), any(MessageTextMap.class)); verify(mockTracer).buildSpan(String.format("receive:%s", mockMessageChannel.toString())); verify(mockSpanBuilder).addReference(References.FOLLOWS_FROM, null); - verify(mockSpanBuilder).startActive(true); + verify(mockSpanBuilder).start(); verify(mockSpanBuilder).withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME); verify(mockSpanBuilder).withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), mockMessageChannel.toString()); verify(mockSpanBuilder).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); @@ -133,17 +135,23 @@ public void preSendShouldStartSpanForServerReceivedMessage() { @Test public void afterSendCompletionShouldDoNothingWithoutSpan() { - interceptor.afterSendCompletion(null, null, true, null); + Message message = MessageBuilder.fromMessage(simpleMessage) + .setHeader(Headers.MESSAGE_CONSUMED, true).setHeader(SCOPE_HEADER, mockScope) + .build(); + when(mockScopeManager.activeSpan()).thenReturn(mockSpan); + + interceptor.afterSendCompletion(message, null, true, null); verify(mockSpan, times(0)).log(anyString()); + verify(mockSpan, times(1)).finish(); } @Test public void afterSendCompletionShouldFinishSpanForServerSendMessage() { Message message = MessageBuilder.fromMessage(simpleMessage) - .setHeader(Headers.MESSAGE_CONSUMED, true) + .setHeader(Headers.MESSAGE_CONSUMED, true).setHeader(SCOPE_HEADER, mockScope) .build(); - when(mockScopeManager.active()).thenReturn(mockScope); + when(mockScopeManager.activeSpan()).thenReturn(mockSpan); interceptor.afterSendCompletion(message, null, true, null); @@ -152,18 +160,24 @@ public void afterSendCompletionShouldFinishSpanForServerSendMessage() { @Test public void afterSendCompletionShouldFinishSpanForClientSendMessage() { - when(mockScopeManager.active()).thenReturn(mockScope); + Message message = MessageBuilder.fromMessage(simpleMessage) + .setHeader(SCOPE_HEADER, mockScope) + .build(); + when(mockScopeManager.activeSpan()).thenReturn(mockSpan); - interceptor.afterSendCompletion(simpleMessage, null, true, null); + interceptor.afterSendCompletion(message, null, true, null); verify(mockScope).close(); } @Test public void afterSendCompletionShouldFinishSpanForException() { - when(mockScopeManager.active()).thenReturn(mockScope); + Message message = MessageBuilder.fromMessage(simpleMessage) + .setHeader(SCOPE_HEADER, mockScope) + .build(); + when(mockScopeManager.activeSpan()).thenReturn(mockSpan); - interceptor.afterSendCompletion(simpleMessage, null, true, new Exception("test")); + interceptor.afterSendCompletion(message, null, true, new Exception("test")); verify(mockSpan).setTag(Tags.ERROR.getKey(), true); verify(mockScope).close(); diff --git a/pom.xml b/pom.xml index 1d53951..4fee6e9 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ UTF-8 ${project.basedir} - 0.32.0 + 0.33.0 3.1.6 2.1.4.RELEASE Greenwich.SR1