diff --git a/config/jvm.options b/config/jvm.options index 084ff034b6d..34025821671 100644 --- a/config/jvm.options +++ b/config/jvm.options @@ -79,4 +79,17 @@ # # Sets the maximum nesting depth. The depth is a count of objects and arrays that have not # been closed, `{` and `[` respectively. -#-Dlogstash.jackson.stream-read-constraints.max-nesting-depth=1000 \ No newline at end of file +#-Dlogstash.jackson.stream-read-constraints.max-nesting-depth=1000 + +# OTel +-javaagent:/Users/andrea/workspace/onweek_otel_tracing_ls/elastic-otel-javaagent-1.4.1.jar +#-Dotel.service.name=logstash-otel +#-Dotel.metrics.exporter=otlp +#-Dotel.logs.exporter=otlp +#-Dotel.traces.exporter=otlp +#-Dotel.resource.attributes=service.version=1.0,deployment.environment=production +#-Dotel.exporter.otlp.endpoint= +#-Dotel.exporter.otlp.headers=Authorization=Bearer%20 + +#-Dotel.javaagent.logging=application +#-Dotel.javaagent.debug=true \ No newline at end of file diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 4ae67b0c2d5..3bcabbf1a9d 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -266,4 +266,7 @@ dependencies { api group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14' api group: 'commons-codec', name: 'commons-codec', version: '1.17.0' api group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.16' + + // OTel dependencies + implementation 'io.opentelemetry:opentelemetry-api:1.25.0' } diff --git a/logstash-core/src/main/java/org/logstash/Event.java b/logstash-core/src/main/java/org/logstash/Event.java index e1e9f4db1f2..8cef1c23741 100644 --- a/logstash-core/src/main/java/org/logstash/Event.java +++ b/logstash-core/src/main/java/org/logstash/Event.java @@ -22,6 +22,7 @@ import co.elastic.logstash.api.EventFactory; import com.fasterxml.jackson.core.JsonProcessingException; +import io.opentelemetry.api.trace.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.joda.time.DateTime; @@ -71,6 +72,7 @@ public final class Event implements Cloneable, Queueable, co.elastic.logstash.ap private static final FieldReference TAGS_FAILURE_FIELD = FieldReference.from(TAGS_FAILURE); private static final Logger logger = LogManager.getLogger(Event.class); + private transient Map contextualSpans = new HashMap<>(); public Event() { @@ -551,6 +553,22 @@ public static Event deserialize(byte[] data) throws IOException { return fromSerializableMap(data); } + public void associateSpan(Span span, String contextName) { + this.contextualSpans.put(contextName, span); + } + + public void endSpan(String contextName) { + Span span = spanForContext(contextName); + if (span == null) { + return; + } + span.end(); + } + + public Span spanForContext(String contextName) { + return this.contextualSpans.get(contextName); + } + public static class InvalidTagsTypeException extends RuntimeException { private static final long serialVersionUID = 1L; diff --git a/logstash-core/src/main/java/org/logstash/OTelUtil.java b/logstash-core/src/main/java/org/logstash/OTelUtil.java new file mode 100644 index 00000000000..a4fa6789202 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/OTelUtil.java @@ -0,0 +1,22 @@ +package org.logstash; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; + +public class OTelUtil { + // Get the global OpenTelemetry instance configured by the agent + public static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); + + // Create a tracer for this class/service + public static final Tracer tracer = openTelemetry.getTracer("Logstash"); + public static final String METADATA_OTEL_CONTEXT = "otel_context"; + public static final String METADATA_OTEL_FULLCONTEXT = "otel_full_context"; + + public static Span newSpan(String name) { + return tracer.spanBuilder(name) + .setNoParent() + .startSpan(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index e851e0fa899..321d45fb9ed 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -19,9 +19,15 @@ package org.logstash.config.ir; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jruby.RubyArray; import org.jruby.RubyHash; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.Event; import org.logstash.RubyUtil; import org.logstash.Rubyfier; import org.logstash.common.EnvironmentVariableProvider; @@ -57,6 +63,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.logstash.OTelUtil.tracer; import static org.logstash.config.ir.compiler.Utils.copyNonCancelledEvents; /** @@ -68,6 +75,8 @@ */ public final class CompiledPipeline { + private static final Logger LOGGER = LogManager.getLogger(CompiledPipeline.class); + /** * Compiler for conditional expressions that turn {@link IfVertex} into {@link EventCondition}. */ @@ -168,6 +177,7 @@ public CompiledPipeline.CompiledExecution buildExecution() { * @return CompiledPipeline.CompiledExecution the compiled pipeline */ public CompiledPipeline.CompiledExecution buildExecution(boolean orderedExecution) { + LOGGER.info("Pipeline executes in {} mode", orderedExecution ? "ordered" : "unordered"); return orderedExecution ? new CompiledPipeline.CompiledOrderedExecution() : new CompiledPipeline.CompiledUnorderedExecution(); @@ -320,25 +330,35 @@ public final class CompiledOrderedExecution extends CompiledExecution { @Override public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) { - return compute(batch.events(), flush, shutdown); + startWorkerSpans(batch.events()); + int result = compute(batch.events(), flush, shutdown); + stopWorkerSpans(batch.events()); + return result; } + @SuppressWarnings({"unchecked"}) @Override public int compute(final Collection batch, final boolean flush, final boolean shutdown) { if (!batch.isEmpty()) { - @SuppressWarnings({"unchecked"}) final RubyArray outputBatch = RubyUtil.RUBY.newArray(); - @SuppressWarnings({"unchecked"}) final RubyArray filterBatch = RubyUtil.RUBY.newArray(1); + final RubyArray outputBatch = RubyUtil.RUBY.newArray(); + final RubyArray filterBatch = RubyUtil.RUBY.newArray(1); // send batch one-by-one as single-element batches down the filters for (final RubyEvent e : batch) { filterBatch.set(0, e); + startSpans(filterBatch, "pipeline.filters"); _compute(filterBatch, outputBatch, flush, shutdown); + stopSpans(filterBatch, "pipeline.filters"); } + startSpans(outputBatch, "pipeline.outputs"); compiledOutputs.compute(outputBatch, flush, shutdown); + stopSpans(outputBatch, "pipeline.outputs"); return outputBatch.size(); } else if (flush || shutdown) { - @SuppressWarnings({"unchecked"}) final RubyArray outputBatch = RubyUtil.RUBY.newArray(); + final RubyArray outputBatch = RubyUtil.RUBY.newArray(); _compute(EMPTY_ARRAY, outputBatch, flush, shutdown); + startSpans(outputBatch, "pipeline.outputs"); compiledOutputs.compute(outputBatch, flush, shutdown); + stopSpans(outputBatch, "pipeline.outputs"); return outputBatch.size(); } return 0; @@ -351,6 +371,22 @@ private void _compute(final RubyArray batch, final RubyArray events) { + for (RubyEvent event : events) { + Event javaEvent = event.getEvent(); + javaEvent.endSpan("worker"); + } + } + + private void startWorkerSpans(Collection events) { + for (RubyEvent event : events) { + Event javaEvent = event.getEvent(); + Span span = tracer.spanBuilder("worker") + .startSpan(); + javaEvent.associateSpan(span, "worker"); + } + } + public final class CompiledUnorderedExecution extends CompiledExecution { @Override @@ -358,14 +394,22 @@ public int compute(final QueueBatch batch, final boolean flush, final boolean sh return compute(batch.events(), flush, shutdown); } + @SuppressWarnings({"unchecked"}) @Override public int compute(final Collection batch, final boolean flush, final boolean shutdown) { + startWorkerSpans(batch); + + startSpans(batch, "pipeline.filters"); // we know for now this comes from batch.collection() which returns a LinkedHashSet final Collection result = compiledFilters.compute(RubyArray.newArray(RubyUtil.RUBY, batch), flush, shutdown); - @SuppressWarnings({"unchecked"}) final RubyArray outputBatch = RubyUtil.RUBY.newArray(result.size()); + stopSpans(batch, "pipeline.filters"); + final RubyArray outputBatch = RubyUtil.RUBY.newArray(result.size()); copyNonCancelledEvents(result, outputBatch); compiledFilters.clear(); + startSpans(outputBatch, "pipeline.outputs"); compiledOutputs.compute(outputBatch, flush, shutdown); + stopSpans(batch, "pipeline.outputs"); + stopWorkerSpans(batch); return outputBatch.size(); } } @@ -577,4 +621,25 @@ private Collection compileDependencies( ).collect(Collectors.toList()); } } + + private static void stopSpans(Collection batch, String pipelinePhase) { + for (RubyEvent event : batch) { + Event javaEvent = event.getEvent(); + javaEvent.endSpan(pipelinePhase); + } + } + + @SuppressWarnings("try") + private static void startSpans(Collection batch, String pipelinePhase) { + for (RubyEvent event : batch) { + Event javaEvent = event.getEvent(); + Span workerSpan = javaEvent.spanForContext("worker"); + try (Scope scope = workerSpan.makeCurrent()) { + Span span = tracer.spanBuilder(pipelinePhase) + .setParent(Context.current()) + .startSpan(); + javaEvent.associateSpan(span, pipelinePhase); + } + } + } } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ec26b9b0735..b11989b1773 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -20,33 +20,8 @@ package org.logstash.execution; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.nio.file.FileStore; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.NoSuchAlgorithmException; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.trace.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jruby.Ruby; @@ -92,11 +67,11 @@ import org.logstash.instrument.metrics.MetricType; import org.logstash.instrument.metrics.NullMetricExt; import org.logstash.instrument.metrics.UpScaledMetric; -import org.logstash.instrument.metrics.timer.TimerMetric; import org.logstash.instrument.metrics.UptimeMetric; import org.logstash.instrument.metrics.counter.LongCounter; import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import org.logstash.instrument.metrics.gauge.NumberGauge; +import org.logstash.instrument.metrics.timer.TimerMetric; import org.logstash.plugins.ConfigVariableExpander; import org.logstash.plugins.factory.ExecutionContextFactoryExt; import org.logstash.plugins.factory.PluginFactoryExt; @@ -104,6 +79,33 @@ import org.logstash.secret.store.SecretStore; import org.logstash.secret.store.SecretStoreExt; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.logstash.OTelUtil.tracer; import static org.logstash.instrument.metrics.MetricKeys.*; import static org.logstash.instrument.metrics.UptimeMetric.ScaleUnits.MILLISECONDS; import static org.logstash.instrument.metrics.UptimeMetric.ScaleUnits.SECONDS; @@ -217,10 +219,16 @@ private void debugLogStackTrace(ConditionalEvaluationError err) { } } + @SuppressWarnings("try") @JRubyMethod(required = 4) public AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject[] args) throws IncompleteSourceWithMetadataException, NoSuchAlgorithmException { initialize(context, args[0], args[1], args[2]); + + Span span = tracer.spanBuilder("pipeline.initialize") + .setAttribute("pipeline.id", pipelineId().asJavaString()) + .startSpan(); + lirExecution = new CompiledPipeline( lir, new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init( @@ -245,6 +253,9 @@ public AbstractPipelineExt initialize(final ThreadContext context, final IRubyOb lir.getGraph().toString() ); } + + span.end(); + return this; } diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java index 9d849e42da4..09476b17119 100644 --- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java +++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java @@ -22,9 +22,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.logstash.OTelUtil; import org.logstash.config.ir.CompiledPipeline; +import org.logstash.ext.JrubyEventExtLibrary; /** * Pipeline execution worker, it's responsible to execute filters and output plugins for each {@link QueueBatch} that @@ -111,6 +116,9 @@ private boolean abortableCompute(QueueBatch batch, boolean flush, boolean shutdo boolean isNackBatch = false; try { execution.compute(batch, flush, shutdown); + for (JrubyEventExtLibrary.RubyEvent e : batch.events()) { + e.getEvent().endSpan("global"); + } } catch (Exception ex) { if (ex instanceof AbortedBatchException) { isNackBatch = true; diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java index 0a93a347c4d..02f8a7310c0 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java @@ -20,25 +20,75 @@ package org.logstash.ext; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; +import org.logstash.Event; +import org.logstash.OTelUtil; import org.logstash.RubyUtil; import org.logstash.common.LsQueueUtils; import org.logstash.execution.MemoryReadBatch; import org.logstash.execution.QueueBatch; import org.logstash.execution.QueueReadClientBase; +import javax.annotation.Nullable; + +import static org.logstash.OTelUtil.tracer; + /** * JRuby extension to provide an implementation of queue client for InMemory queue * */ @JRubyClass(name = "MemoryReadClient", parent = "QueueReadClientBase") public final class JrubyMemoryReadClientExt extends QueueReadClientBase { + private static final Logger LOGGER = LogManager.getLogger(JrubyMemoryReadClientExt.class); + private static final long serialVersionUID = 1L; + public static final TextMapGetter JAVA_EVENT_CARRIER_GETTER = new JavaEventGetter(OTelUtil.METADATA_OTEL_CONTEXT); + public static final TextMapGetter JAVA_EVENT_CARRIER_GLOBAL_GETTER = new JavaEventGetter(OTelUtil.METADATA_OTEL_FULLCONTEXT); + + private static class JavaEventGetter implements TextMapGetter { + + private final String contextFieldName; + + public JavaEventGetter(String contextFieldName) { + this.contextFieldName = contextFieldName; + } + + @Override + public Iterable keys(Event event) { + Map otelContextMap = retrieveContextMapFromMetadata(event); + return otelContextMap.keySet(); + } + + @SuppressWarnings("unchecked") + private Map retrieveContextMapFromMetadata(Event event) { + // TODO handle error cases, like no meta, no otel_context map, type conversion etc + return (Map) event.getMetadata().get(contextFieldName); + } + + @Nullable + @Override + public String get(@Nullable Event event, String s) { + Map otelContextMap = retrieveContextMapFromMetadata(event); + return otelContextMap.get(s); + } + }; + @SuppressWarnings({"rawtypes", "serial"}) private BlockingQueue queue; public JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass) { @@ -81,7 +131,37 @@ public QueueBatch newBatch() { @SuppressWarnings("unchecked") public QueueBatch readBatch() throws InterruptedException { final MemoryReadBatch batch = MemoryReadBatch.create(LsQueueUtils.drain(queue, batchSize, waitForNanos)); + recordQueueSpans(batch); startMetrics(batch); return batch; } + + private void recordQueueSpans(MemoryReadBatch batch) { + for (JrubyEventExtLibrary.RubyEvent e : batch.events()) { + Event javaEvent = e.getEvent(); + + // deserialize the otel context map from event carrier + ContextPropagators propagators = OTelUtil.openTelemetry.getPropagators(); + TextMapPropagator textMapPropagator = propagators.getTextMapPropagator(); + + // Extract and store the propagated span's SpanContext and other available concerns + // in the specified Context. + Context globalContext = textMapPropagator.extract(Context.current(), javaEvent, JAVA_EVENT_CARRIER_GLOBAL_GETTER); + Context queueContext = textMapPropagator.extract(Context.current(), javaEvent, JAVA_EVENT_CARRIER_GETTER); + + Span globalSpan = tracer.spanBuilder("pipeline.total") + .setParent(globalContext) + .setSpanKind(SpanKind.SERVER).startSpan(); + globalSpan.makeCurrent(); + + Span queueSpan = tracer.spanBuilder("pipeline.queue") +// .setParent(queueContext) + .setSpanKind(SpanKind.SERVER).startSpan(); + + queueSpan.makeCurrent(); + queueSpan.end(); + + javaEvent.associateSpan(globalSpan, "global"); + } + } } diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java index 0d920790947..637090396f1 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java @@ -21,17 +21,25 @@ package org.logstash.ext; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; import org.jruby.runtime.ThreadContext; import org.logstash.Event; +import org.logstash.OTelUtil; import org.logstash.RubyUtil; import org.logstash.common.LsQueueUtils; +import static org.logstash.OTelUtil.tracer; + @JRubyClass(name = "MemoryWriteClient") public final class JrubyMemoryWriteClientExt extends JRubyAbstractQueueWriteClientExt { @@ -58,13 +66,44 @@ public static JrubyMemoryWriteClientExt create( @Override protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context, final JrubyEventExtLibrary.RubyEvent event) throws InterruptedException { + Event carrierEvent = event.getEvent(); + propagateOtelContextInEvent(carrierEvent); queue.put(event); return this; } + @SuppressWarnings("try") + private static void propagateOtelContextInEvent(Event carrierEvent) { + Span span = tracer.spanBuilder("pipeline.total").startSpan(); + try (Scope unused = span.makeCurrent()) { + Span queueSpan = tracer.spanBuilder("pipeline.queue") + // TODO +// .setAttribute(AttributeKey.stringKey("pipeline.id"), "abracadabra") + .startSpan(); + propagateContextIntoEvent(carrierEvent, OTelUtil.METADATA_OTEL_FULLCONTEXT, Context.current()); + + try (Scope ignored = queueSpan.makeCurrent()) { + propagateContextIntoEvent(carrierEvent, OTelUtil.METADATA_OTEL_CONTEXT, Context.current()); + } + } + } + + private static void propagateContextIntoEvent(Event carrierEvent, String targetEventField, Context context) { + Map otemContextMap = new HashMap<>(); + carrierEvent.getMetadata().put(targetEventField, otemContextMap); + ContextPropagators propagators = OTelUtil.openTelemetry.getPropagators(); + propagators.getTextMapPropagator().inject(context, carrierEvent, + (javaEvent, key, value) -> otemContextMap.put(key, value)); + } + @Override public JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, final Collection batch) throws InterruptedException { + // create new span for each event and propagate in the event itself + for (JrubyEventExtLibrary.RubyEvent event : batch) { + Event carrierEvent = event.getEvent(); + propagateOtelContextInEvent(carrierEvent); + } LsQueueUtils.addAll(queue, batch); return this; } @@ -72,7 +111,9 @@ public JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, @Override public void push(Map event) { try { - queue.put(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event(event))); + Event carrierEvent = new Event(event); + propagateOtelContextInEvent(carrierEvent); + queue.put(JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, carrierEvent)); } catch (InterruptedException e) { throw new IllegalStateException(e); }