Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.instrument.metrics;

/**
* A {@link DatapointCapture} provides access to timing information
* captured at a specific point in time.
*/
abstract class DatapointCapture {

private final long nanoTime;

DatapointCapture(final long nanoTime) {
this.nanoTime = nanoTime;
}

/**
* @return the nanoTime of this capture, as provided at time
* of capture by the {@link FlowMetric}.
*/
public long nanoTime() {
return nanoTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@
import org.apache.logging.log4j.Logger;
import org.logstash.util.SetOnceReference;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
Expand All @@ -54,7 +50,7 @@ public class ExtendedFlowMetric extends BaseFlowMetric {
private final Collection<? extends FlowMetricRetentionPolicy> retentionPolicies;

// set-once atomic reference; see ExtendedFlowMetric#appendCapture(FlowCapture)
private final SetOnceReference<List<RetentionWindow>> retentionWindows = SetOnceReference.unset();
private final SetOnceReference<List<RetentionWindow<FlowCapture>>> retentionWindows = SetOnceReference.unset();

public ExtendedFlowMetric(final String name,
final Metric<? extends Number> numeratorMetric,
Expand Down Expand Up @@ -98,9 +94,9 @@ public Map<String, Double> getValue() {
final Map<String, Double> rates = new LinkedHashMap<>();

this.retentionWindows.get()
.forEach(window -> window.baseline(currentCapture.nanoTime())
.forEach(window -> baseline(window, currentCapture.nanoTime())
.or(() -> windowDefaultBaseline(window))
.map((baseline) -> calculateRate(currentCapture, baseline))
.map(b -> calculateRate(currentCapture, b))
.orElseGet(OptionalDouble::empty)
.ifPresent((rate) -> rates.put(window.policy.policyName(), rate)));

Expand All @@ -109,6 +105,16 @@ public Map<String, Double> getValue() {
return Collections.unmodifiableMap(rates);
}

/**
* @param nanoTime the nanoTime of the capture for which we are retrieving a baseline.
* @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older
* than this window's {@link FlowMetricRetentionPolicy} allowed retention if one
* exists, and is otherwise empty.
*/
Optional<FlowCapture> baseline(RetentionWindow<FlowCapture> window, final long nanoTime) {
return window.returnHead(nanoTime);
}

/**
* Appends the given {@link FlowCapture} to the existing {@link RetentionWindow}s, XOR creates
* a new list of {@link RetentionWindow} using the provided {@link FlowCapture} as a baseline.
Expand All @@ -121,32 +127,22 @@ private void appendCapture(final FlowCapture capture) {
);
}

private static List<RetentionWindow> initRetentionWindows(final Collection<? extends FlowMetricRetentionPolicy> retentionPolicies,
private static List<RetentionWindow<FlowCapture>> initRetentionWindows(final Collection<? extends FlowMetricRetentionPolicy> retentionPolicies,
final FlowCapture capture) {
return retentionPolicies.stream()
.map((p) -> new RetentionWindow(p, capture))
.map((p) -> new RetentionWindow<FlowCapture>(p, capture))
.collect(Collectors.toUnmodifiableList());
}

private static void injectIntoRetentionWindows(final List<RetentionWindow> retentionWindows, final FlowCapture capture) {
private static void injectIntoRetentionWindows(final List<RetentionWindow<FlowCapture>> retentionWindows, final FlowCapture capture) {
retentionWindows.forEach((rw) -> rw.append(capture));
}

/**
* Internal tooling to select the younger of two captures
*/
private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) {
if (existing == null) { return proposed; }
if (proposed == null) { return existing; }

return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed;
}

/**
* If a window's policy allows it to report before its retention has been reached,
* use our lifetime baseline as a default.
*/
private Optional<FlowCapture> windowDefaultBaseline(final RetentionWindow window) {
private Optional<FlowCapture> windowDefaultBaseline(final RetentionWindow<FlowCapture> window) {
if (window.policy.reportBeforeSatisfied()) {
return this.lifetimeBaseline.asOptional();
}
Expand Down Expand Up @@ -180,183 +176,4 @@ Duration estimateExcessRetained(final ToLongFunction<FlowMetricRetentionPolicy>
.sum();
return Duration.ofNanos(cumulativeExcessRetained);
}

/**
* A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to
* meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture
* that is older than the policy's allowed retention (if any).
* The implementation is similar to a singly-linked list whose youngest captures are at
* the tail and oldest captures are at the head, with an additional pre-tail stage.
* Compaction is always done at read-time and occasionally at write-time.
* Both reads and writes are non-blocking and concurrency-safe.
*/
private static class RetentionWindow {
private final AtomicReference<FlowCapture> stagedCapture = new AtomicReference<>();
private final AtomicReference<Node> tail;
private final AtomicReference<Node> head;
private final FlowMetricRetentionPolicy policy;

RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) {
this.policy = policy;
final Node zeroNode = new Node(zeroCapture);
this.head = new AtomicReference<>(zeroNode);
this.tail = new AtomicReference<>(zeroNode);
}

/**
* Append the newest {@link FlowCapture} into this {@link RetentionWindow},
* while respecting our {@link FlowMetricRetentionPolicy}.
* We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but
* expect callers of this method to minimize lag between instantiating the capture
* and appending it.
*
* @param newestCapture the newest capture to stage
*/
private void append(final FlowCapture newestCapture) {
final Node casTail = this.tail.getAcquire(); // for CAS
final long newestCaptureNanoTime = newestCapture.nanoTime();

// stage our newest capture unless it is older than the currently-staged capture
final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture);

// promote our previously-staged capture IFF our newest capture is too far
// ahead of the current tail to support policy's resolution.
if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) {
// attempt to set an _unlinked_ Node to our tail
final Node proposedNode = new Node(previouslyStaged);
if (this.tail.compareAndSet(casTail, proposedNode)) {
// if we succeeded at setting an unlinked node, link to it from our old tail
casTail.setNext(proposedNode);

// perform a force-compaction of our head if necessary,
// detected using plain memory access
final Node currentHead = head.getPlain();
final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos));
}
if (headAgeNanos > policy.forceCompactionNanos()) {
final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos()));
if (LOGGER.isDebugEnabled()) {
final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime());
LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos));
}
}
}
}
}

@Override
public String toString() {
return "RetentionWindow{" +
"policy=" + policy.policyName() +
" id=" + System.identityHashCode(this) +
'}';
}

/**
* @param nanoTime the nanoTime of the capture for which we are retrieving a baseline.
* @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older
* than this window's {@link FlowMetricRetentionPolicy} allowed retention if one
* exists, and is otherwise empty.
*/
public Optional<FlowCapture> baseline(final long nanoTime) {
final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos());
final Node head = compactHead(barrier);
if (head.captureNanoTime() <= barrier) {
return Optional.of(head.capture);
} else {
return Optional.empty();
}
}

/**
* @return a computationally-expensive estimate of the number of captures in this window,
* using plain memory access. This should NOT be run in unguarded production code.
*/
private static int estimateSize(final Node headNode) {
int i = 1; // assume we have one additional staged
// NOTE: we chase the provided headNode's tail with plain-gets,
// which tolerates missed appends from other threads.
for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; }
return i;
}

/**
* @see RetentionWindow#estimateSize(Node)
*/
private int estimateSize() {
return estimateSize(this.head.getPlain());
}

/**
* @param barrier a nanoTime that will NOT be crossed during compaction
* @return the head node after compaction up to the provided barrier.
*/
private Node compactHead(final long barrier) {
return this.head.updateAndGet((existingHead) -> {
final Node proposedHead = existingHead.seekWithoutCrossing(barrier);
return Objects.requireNonNullElse(proposedHead, existingHead);
});
}

/**
* Internal testing support
*/
private long excessRetained(final long currentNanoTime, final ToLongFunction<FlowMetricRetentionPolicy> retentionWindowFunction) {
final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy));
return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime()));
}

/**
* A {@link Node} holds a single {@link FlowCapture} and
* may link ahead to the next {@link Node}.
* It is an implementation detail of {@link RetentionWindow}.
*/
private static class Node {
private static final VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

private final FlowCapture capture;
private volatile Node next;

Node(final FlowCapture capture) {
this.capture = capture;
}

Node seekWithoutCrossing(final long barrier) {
Node newestOlderThanThreshold = null;
Node candidate = this;

while(candidate != null && candidate.captureNanoTime() < barrier) {
newestOlderThanThreshold = candidate;
candidate = candidate.getNext();
}
return newestOlderThanThreshold;
}

long captureNanoTime() {
return this.capture.nanoTime();
}

void setNext(final Node nextNode) {
next = nextNode;
}

Node getNext() {
return next;
}

Node getNextPlain() {
return (Node)NEXT.get(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,16 @@
* point-in-time data for a pair of {@link Metric}s.
* It is immutable.
*/
class FlowCapture {
class FlowCapture extends DatapointCapture {
private final Number numerator;
private final Number denominator;

private final long nanoTime;

FlowCapture(final long nanoTime,
final Number numerator,
final Number denominator) {
super(nanoTime);
this.numerator = numerator;
this.denominator = denominator;
this.nanoTime = nanoTime;
}

/**
* @return the nanoTime of this capture, as provided at time
* of capture by the {@link FlowMetric}.
*/
public long nanoTime() {
return nanoTime;
}

/**
Expand All @@ -65,7 +55,7 @@ public BigDecimal denominator() {
@Override
public String toString() {
return getClass().getSimpleName() +"{" +
"nanoTimestamp=" + nanoTime +
"nanoTimestamp=" + nanoTime() +
" numerator=" + numerator() +
" denominator=" + denominator() +
'}';
Expand Down
Loading