-
Notifications
You must be signed in to change notification settings - Fork 324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added (initial) support for compressing spans #2477
Changes from 2 commits
94c8cbc
8667bfb
cd2a889
891dc23
40b709f
dbc556e
147a829
a11c04f
d595005
f5c7047
9303d31
d879444
dca0fb1
32e720b
0bd7e7c
33a6909
5fff5ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package co.elastic.apm.agent.impl.transaction; | ||
|
||
import co.elastic.apm.agent.objectpool.Recyclable; | ||
|
||
import javax.annotation.Nullable; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class Composite implements Recyclable { | ||
|
||
private final AtomicInteger count = new AtomicInteger(0); | ||
|
||
private final AtomicLong sum = new AtomicLong(0L); | ||
|
||
@Nullable | ||
private String compressionStrategy; | ||
|
||
public void init(long sum, String compressionStrategy) { | ||
this.count.set(1); | ||
this.sum.set(sum); | ||
this.compressionStrategy = compressionStrategy; | ||
} | ||
|
||
public int getCount() { | ||
return count.get(); | ||
} | ||
|
||
public void increaseCount() { | ||
count.incrementAndGet(); | ||
} | ||
|
||
public long getSum() { | ||
return sum.get(); | ||
} | ||
|
||
public double getSumMs() { | ||
return sum.get() / 1000.0; | ||
} | ||
|
||
public void increaseSum(long delta) { | ||
this.sum.addAndGet(delta); | ||
} | ||
|
||
public String getCompressionStrategy() { | ||
return compressionStrategy; | ||
} | ||
|
||
@Override | ||
public void resetState() { | ||
this.count.set(0); | ||
this.sum.set(0L); | ||
this.compressionStrategy = null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,9 +29,11 @@ | |
import co.elastic.apm.agent.objectpool.Recyclable; | ||
import co.elastic.apm.agent.sdk.logging.Logger; | ||
import co.elastic.apm.agent.sdk.logging.LoggerFactory; | ||
import co.elastic.apm.agent.util.StringBuilderUtils; | ||
|
||
import javax.annotation.Nullable; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public class Span extends AbstractSpan<Span> implements Recyclable { | ||
|
@@ -65,6 +67,7 @@ public class Span extends AbstractSpan<Span> implements Recyclable { | |
* Any other arbitrary data captured by the agent, optionally provided by the user | ||
*/ | ||
private final SpanContext context = new SpanContext(); | ||
private final Composite composite = new Composite(); | ||
@Nullable | ||
private Throwable stacktrace; | ||
@Nullable | ||
|
@@ -151,6 +154,14 @@ public SpanContext getContext() { | |
return context; | ||
} | ||
|
||
public boolean isComposite() { | ||
return composite.getCount() > 0; | ||
} | ||
|
||
public Composite getComposite() { | ||
return composite; | ||
} | ||
|
||
/** | ||
* Keywords of specific relevance in the span's domain (eg: 'db', 'template', 'ext', etc) | ||
*/ | ||
|
@@ -292,13 +303,119 @@ public void beforeEnd(long epochMicros) { | |
|
||
@Override | ||
protected void afterEnd() { | ||
this.tracer.endSpan(this); | ||
if (tracer.getConfig(CoreConfiguration.class).isSpanCompressionEnabled()) { | ||
Span buffered = parent.bufferedSpan.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this may lead to multiple threads concurrently having access to the buffered span instance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Multiple threads can concurrently have access, that's okay here. It is resolved at the update or report stage which you can fully reason about here (nice when it's concurrent). There are 3 possible atomic concurrent updates, and in each case either it succeeds (true) or fails (false). The cases are complete
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, concurrent access is fine here and also covered by tests. |
||
if (!isCompressionEligible()) { | ||
if (buffered != null) { | ||
if (parent.bufferedSpan.compareAndSet(buffered, null)) { | ||
this.tracer.endSpan(buffered); | ||
} | ||
} | ||
this.tracer.endSpan(this); | ||
return; | ||
} | ||
if (buffered == null) { | ||
if (!parent.bufferedSpan.compareAndSet(null, this)) { | ||
this.tracer.endSpan(this); | ||
jackshirazi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return; | ||
} | ||
if (!buffered.tryToCompress(this)) { | ||
if (!parent.bufferedSpan.compareAndSet(buffered, this)) { | ||
tobiasstadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.tracer.endSpan(buffered); | ||
tobiasstadler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.tracer.endSpan(this); | ||
jackshirazi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
this.tracer.endSpan(buffered); | ||
} | ||
} else { | ||
decrementReferences(); | ||
} | ||
} else { | ||
this.tracer.endSpan(this); | ||
} | ||
} | ||
|
||
private boolean isCompressionEligible() { | ||
return isExit() && isDiscardable() && (getOutcome() == null || getOutcome() == Outcome.SUCCESS); | ||
} | ||
|
||
private boolean tryToCompress(Span sibling) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this method could also be simplified when we can guarantee that we have exclusive access to both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @felixbarny I agree your solution was the more elegant one. I decided that in the high contention case (which is really when these alternatives matter) we want to avoid the backoff and retry loop, ie that under conflict it's better to drop the compression and allow the thread to proceed asap rather than maximally try to compress. So felt this solution was acceptable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note the loops in the try are getting maxes after compression has succeeded, so should not really cause contention, but we may need to review There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for explaining, makes sense. I agree that the approach taken in this PR is probably the one that causes less contention. I find it a little harder to reason about and verify correctness as there's more that can happen concurrently. But from what I can tell, it seems like all the cases are handled properly. |
||
boolean canBeCompressed = isComposite() ? tryToCompressComposite(sibling) : tryToCompressRegular(sibling); | ||
if (!canBeCompressed) { | ||
return false; | ||
} | ||
|
||
long newDuration = sibling.getTimestamp() + sibling.duration - getTimestamp(); | ||
synchronized (composite) { | ||
if (newDuration > duration) { | ||
duration = newDuration; | ||
} | ||
} | ||
|
||
composite.increaseCount(); | ||
composite.increaseSum(sibling.duration); | ||
|
||
return true; | ||
} | ||
|
||
private boolean tryToCompressRegular(Span sibling) { | ||
if (!isSameKind(sibling)) { | ||
return false; | ||
} | ||
|
||
long maxExactMatchDuration = tracer.getConfig(CoreConfiguration.class).getSpanCompressionExactMatchMaxDuration().getMicros(); | ||
long maxSameKindDuration = tracer.getConfig(CoreConfiguration.class).getSpanCompressionSameKindMaxDuration().getMicros(); | ||
|
||
boolean isAlreadyComposite; | ||
synchronized (composite) { | ||
isAlreadyComposite = isComposite(); | ||
if (!isAlreadyComposite) { | ||
if (StringBuilderUtils.equals(name, sibling.name)) { | ||
if (duration <= maxExactMatchDuration && sibling.duration <= maxExactMatchDuration) { | ||
composite.init(duration, "exact_match"); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
if (duration <= maxSameKindDuration && sibling.duration <= maxSameKindDuration) { | ||
composite.init(duration, "same_kind"); | ||
name.setLength(0); | ||
name.append("Calls to ").append(context.getDestination().getService().getResource()); | ||
return true; | ||
} | ||
} | ||
} | ||
|
||
return isAlreadyComposite && tryToCompressComposite(sibling); | ||
} | ||
|
||
private boolean tryToCompressComposite(Span sibling) { | ||
switch (composite.getCompressionStrategy()) { | ||
case "exact_match": | ||
long maxExactMatchDuration = tracer.getConfig(CoreConfiguration.class).getSpanCompressionExactMatchMaxDuration().getMicros(); | ||
return isSameKind(sibling) && StringBuilderUtils.equals(name, sibling.name) && sibling.duration <= maxExactMatchDuration; | ||
|
||
case "same_kind": | ||
long maxSameKindDuration = tracer.getConfig(CoreConfiguration.class).getSpanCompressionSameKindMaxDuration().getMicros(); | ||
return isSameKind(sibling) && sibling.duration <= maxSameKindDuration; | ||
default: | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private boolean isSameKind(Span other) { | ||
return Objects.equals(type, other.type) | ||
&& Objects.equals(subtype, other.subtype) | ||
&& StringBuilderUtils.equals(context.getDestination().getService().getResource(), other.context.getDestination().getService().getResource()); | ||
} | ||
|
||
@Override | ||
public void resetState() { | ||
super.resetState(); | ||
context.resetState(); | ||
composite.resetState(); | ||
stacktrace = null; | ||
type = null; | ||
subtype = null; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until #2083 and #2084 are done