Skip to content

Commit

Permalink
Signals processing (#1014)
Browse files Browse the repository at this point in the history
  • Loading branch information
LikeTheSalad authored Sep 7, 2023
1 parent 7e4637a commit d56ea37
Show file tree
Hide file tree
Showing 12 changed files with 607 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ components:
- HaloFour
noop-api:
- jack-berg
processors:
- LikeTheSalad
- breedx-splk
prometheus-collector:
- jkwatson
resource-providers:
Expand Down
10 changes: 10 additions & 0 deletions processors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Processors

This module provides tools to intercept and process signals globally.

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Jason Plumb](https://github.com/breedx-splk), Splunk

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
17 changes: 17 additions & 0 deletions processors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "Tools to intercept and process signals globally."
otelJava.moduleName.set("io.opentelemetry.contrib.processors")

java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

dependencies {
api("io.opentelemetry:opentelemetry-sdk")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;

/** Intercepts logs before delegating them to the real exporter. */
public final class InterceptableLogRecordExporter implements LogRecordExporter {
private final LogRecordExporter delegate;
private final Interceptor<LogRecordData> interceptor;

public InterceptableLogRecordExporter(
LogRecordExporter delegate, Interceptor<LogRecordData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(interceptor.interceptAll(logs));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;

/** Intercepts metrics before delegating them to the real exporter. */
public final class InterceptableMetricExporter implements MetricExporter {
private final MetricExporter delegate;
private final Interceptor<MetricData> interceptor;

public InterceptableMetricExporter(MetricExporter delegate, Interceptor<MetricData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return delegate.export(interceptor.interceptAll(metrics));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return delegate.getAggregationTemporality(instrumentType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;

/** Intercepts spans before delegating them to the real exporter. */
public final class InterceptableSpanExporter implements SpanExporter {
private final SpanExporter delegate;
private final Interceptor<SpanData> interceptor;

public InterceptableSpanExporter(SpanExporter delegate, Interceptor<SpanData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return delegate.export(interceptor.interceptAll(spans));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.api;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;

/**
* Intercepts a signal before it gets exported. The signal can get updated and/or filtered out based
* on each interceptor implementation.
*/
public interface Interceptor<T> {

/**
* Intercepts a signal.
*
* @param item The signal object.
* @return The received signal modified (or null for excluding this signal from getting exported).
* If there's no operation needed to be done for a specific signal, it should be returned as
* is.
*/
@Nullable
T intercept(T item);

/** Intercepts a collection of signals. */
default Collection<T> interceptAll(Collection<T> items) {
List<T> result = new ArrayList<>();

for (T item : items) {
T intercepted = intercept(item);
if (intercepted != null) {
result.add(intercepted);
}
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.common;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;

/** Allows to run an item through a list of interceptors in the order they were added. */
public final class ComposableInterceptor<T> implements Interceptor<T> {
private final CopyOnWriteArrayList<Interceptor<T>> interceptors = new CopyOnWriteArrayList<>();

public void add(Interceptor<T> interceptor) {
interceptors.addIfAbsent(interceptor);
}

@Nullable
@Override
public T intercept(T item) {
T intercepted = item;
for (Interceptor<T> interceptor : interceptors) {
intercepted = interceptor.intercept(intercepted);
if (intercepted == null) {
break;
}
}
return intercepted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.contrib.interceptor.common.ComposableInterceptor;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import java.util.List;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class InterceptableLogRecordExporterTest {
private InMemoryLogRecordExporter memoryLogRecordExporter;
private Logger logger;
private ComposableInterceptor<LogRecordData> interceptor;

@BeforeEach
void setUp() {
memoryLogRecordExporter = InMemoryLogRecordExporter.create();
interceptor = new ComposableInterceptor<>();
logger =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
SimpleLogRecordProcessor.create(
new InterceptableLogRecordExporter(memoryLogRecordExporter, interceptor)))
.build()
.get("TestScope");
}

@Test
void verifyLogModification() {
interceptor.add(
item -> {
ModifiableLogRecordData modified = new ModifiableLogRecordData(item);
modified.attributes.put("global.attr", "from interceptor");
return modified;
});

logger
.logRecordBuilder()
.setBody("One log")
.setAttribute(AttributeKey.stringKey("local.attr"), "local")
.emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(1, finishedLogRecordItems.size());
LogRecordData logRecordData = finishedLogRecordItems.get(0);
assertEquals(2, logRecordData.getAttributes().size());
assertEquals(
"from interceptor",
logRecordData.getAttributes().get(AttributeKey.stringKey("global.attr")));
assertEquals("local", logRecordData.getAttributes().get(AttributeKey.stringKey("local.attr")));
}

@Test
void verifyLogFiltering() {
interceptor.add(
item -> {
if (item.getBody().asString().contains("deleted")) {
return null;
}
return item;
});

logger.logRecordBuilder().setBody("One log").emit();
logger.logRecordBuilder().setBody("This log will be deleted").emit();
logger.logRecordBuilder().setBody("Another log").emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(2, finishedLogRecordItems.size());
assertEquals("One log", finishedLogRecordItems.get(0).getBody().asString());
assertEquals("Another log", finishedLogRecordItems.get(1).getBody().asString());
}

private static class ModifiableLogRecordData implements LogRecordData {
private final LogRecordData delegate;
private final AttributesBuilder attributes = Attributes.builder();

private ModifiableLogRecordData(LogRecordData delegate) {
this.delegate = delegate;
}

@Override
public Resource getResource() {
return delegate.getResource();
}

@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return delegate.getInstrumentationScopeInfo();
}

@Override
public long getTimestampEpochNanos() {
return delegate.getTimestampEpochNanos();
}

@Override
public long getObservedTimestampEpochNanos() {
return delegate.getObservedTimestampEpochNanos();
}

@Override
public SpanContext getSpanContext() {
return delegate.getSpanContext();
}

@Override
public Severity getSeverity() {
return delegate.getSeverity();
}

@Nullable
@Override
public String getSeverityText() {
return delegate.getSeverityText();
}

@Override
public Body getBody() {
return delegate.getBody();
}

@Override
public Attributes getAttributes() {
return attributes.putAll(delegate.getAttributes()).build();
}

@Override
public int getTotalAttributeCount() {
return delegate.getTotalAttributeCount();
}
}
}
Loading

0 comments on commit d56ea37

Please sign in to comment.