Skip to content

Commit

Permalink
Merge pull request #2094 from beyonnex-io/bugfix/connectivity-tracing…
Browse files Browse the repository at this point in the history
…-parent-fix

fix spans for consuming messages to not be the "parent" span for following spans
  • Loading branch information
thjaeckle authored Jan 15, 2025
2 parents dd245af + 477650e commit d02b05f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.JmsMessageConsumer;
import org.apache.qpid.jms.message.JmsMessage;
Expand All @@ -63,20 +70,12 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.TracingSpans;

import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Sink;

/**
* Actor which receives message from an AMQP source and forwards them to a {@code MessageMappingProcessorActor}.
*/
Expand Down Expand Up @@ -342,10 +341,11 @@ private void messageConsumerFailed(final Status.Failure failure) {
handleAddressStatus(addressStatus);
}

private void handleJmsMessage(final JmsMessage message) {
private void handleJmsMessage(final JmsMessage message) throws JMSException {
Map<String, String> headers = null;
String correlationId = null;
var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("amqp_consume"));
var startedSpan = TracingSpans.emptyStartedSpan(
SpanOperationName.of("amqp_consume: " + message.getJMSDestination())
);
try {
recordIncomingForRateLimit(message.getJMSMessageID());
if (logger.isDebugEnabled()) {
Expand All @@ -358,12 +358,16 @@ private void handleJmsMessage(final JmsMessage message) {
ackType);
}
headers = extractHeadersMapFromJmsMessage(message);
correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey());
final String correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey());
startedSpan = DittoTracing.newPreparedSpan(headers, startedSpan.getOperationName())
.correlationId(correlationId)
.connectionId(connectionId)
.start();
headers = startedSpan.propagateContext(headers);
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.pekko.kafka.ConsumerMessage;
import org.eclipse.ditto.base.model.common.ByteBufferUtils;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -41,8 +42,6 @@
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;

import org.apache.pekko.kafka.ConsumerMessage;

/**
* Transforms incoming messages from Apache Kafka to {@link org.eclipse.ditto.connectivity.api.ExternalMessage}.
*/
Expand Down Expand Up @@ -108,11 +107,16 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
final String correlationId = messageHeaders
.getOrDefault(DittoHeaderDefinition.CORRELATION_ID.getKey(), UUID.randomUUID().toString());

final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders, SpanOperationName.of("kafka_consume"))
.correlationId(correlationId)
final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders,
SpanOperationName.of("kafka_consume: " + consumerRecord.topic())
).correlationId(correlationId)
.connectionId(connectionId)
.start();
messageHeaders = startedSpan.propagateContext(messageHeaders);
messageHeaders = startedSpan.propagateContext(DittoHeaders.of(messageHeaders)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
);

try {
final String key = consumerRecord.key();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

import javax.annotation.Nullable;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.javadsl.Sink;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand Down Expand Up @@ -51,12 +56,6 @@
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.javadsl.Sink;


/**
* Actor which receives message from an RabbitMQ source and forwards them to a {@code MessageMappingProcessorActor}.
Expand Down Expand Up @@ -135,7 +134,9 @@ private void handleDelivery(final Delivery delivery) {
final Envelope envelope = delivery.getEnvelope();
final byte[] body = delivery.getBody();

var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("rabbitmq_consume"));
var startedSpan = TracingSpans.emptyStartedSpan(
SpanOperationName.of("rabbitmq_consume: " + envelope.getExchange())
);
Map<String, String> headers = null;
try {
@Nullable final String correlationId = properties.getCorrelationId();
Expand All @@ -150,7 +151,11 @@ private void handleDelivery(final Delivery delivery) {
.connectionId(connectionId)
.correlationId(correlationId)
.start();
headers = startedSpan.propagateContext(headers);
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
);

final ExternalMessageBuilder externalMessageBuilder =
ExternalMessageFactory.newExternalMessageBuilder(headers);
Expand Down

0 comments on commit d02b05f

Please sign in to comment.