-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Extract reusable classes from flow metrics #18502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
ae0eed1
683e7c5
ad3f56f
a9caff4
747e637
6208424
b0b0721
77c8b7a
2b8ca9a
4ceaf14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Licensed to Elasticsearch B.V. under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch B.V. licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.logstash.instrument.metrics; | ||
|
|
||
| /** | ||
| * A {@link DatapointCapture} provides access to timing information | ||
| * captured at a specific point in time. | ||
| */ | ||
| abstract class DatapointCapture { | ||
|
|
||
| private final long nanoTime; | ||
|
|
||
| DatapointCapture(final long nanoTime) { | ||
| this.nanoTime = nanoTime; | ||
| } | ||
|
|
||
| /** | ||
| * @return the nanoTime of this capture, as provided at time | ||
| * of capture by the {@link FlowMetric}. | ||
| */ | ||
| public long nanoTime() { | ||
| return nanoTime; | ||
| } | ||
|
|
||
| DatapointCapture selectNewestCapture(final DatapointCapture proposed) { | ||
| if (proposed == null) { | ||
| return this; | ||
| } | ||
|
|
||
| return (this.nanoTime() > proposed.nanoTime()) ? this : proposed; | ||
| } | ||
|
|
||
| /** | ||
| * Internal tooling to select the younger of two captures | ||
| */ | ||
| static DatapointCapture selectNewestCaptureOf(final DatapointCapture existing, final DatapointCapture proposed) { | ||
| if (existing == null) { | ||
| return proposed; | ||
| } | ||
|
|
||
| return existing.selectNewestCapture(proposed); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,19 +23,15 @@ | |
| import org.apache.logging.log4j.Logger; | ||
| import org.logstash.util.SetOnceReference; | ||
|
|
||
| import java.lang.invoke.MethodHandles; | ||
| import java.lang.invoke.VarHandle; | ||
| import java.time.Duration; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.EnumSet; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.OptionalDouble; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.LongSupplier; | ||
| import java.util.function.ToLongFunction; | ||
| import java.util.stream.Collectors; | ||
|
|
@@ -100,7 +96,15 @@ public Map<String, Double> getValue() { | |
| this.retentionWindows.get() | ||
| .forEach(window -> window.baseline(currentCapture.nanoTime()) | ||
| .or(() -> windowDefaultBaseline(window)) | ||
| .map((baseline) -> calculateRate(currentCapture, baseline)) | ||
| .map(b -> { | ||
| if (b instanceof FlowCapture baseline) { | ||
| return calculateRate(currentCapture, baseline); | ||
| } else { | ||
| LOGGER.warn("Retention window `{}` provided a non-FlowCapture baseline `{}`; skipping rate calculation", | ||
| window.policy.policyName(), b); | ||
| return OptionalDouble.empty(); | ||
| } | ||
|
||
| }) | ||
| .orElseGet(OptionalDouble::empty) | ||
| .ifPresent((rate) -> rates.put(window.policy.policyName(), rate))); | ||
|
|
||
|
|
@@ -132,16 +136,6 @@ private static void injectIntoRetentionWindows(final List<RetentionWindow> reten | |
| retentionWindows.forEach((rw) -> rw.append(capture)); | ||
| } | ||
|
|
||
| /** | ||
| * Internal tooling to select the younger of two captures | ||
| */ | ||
| private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) { | ||
| if (existing == null) { return proposed; } | ||
| if (proposed == null) { return existing; } | ||
|
|
||
| return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed; | ||
| } | ||
|
|
||
| /** | ||
| * If a window's policy allows it to report before its retention has been reached, | ||
| * use our lifetime baseline as a default. | ||
|
|
@@ -180,183 +174,4 @@ Duration estimateExcessRetained(final ToLongFunction<FlowMetricRetentionPolicy> | |
| .sum(); | ||
| return Duration.ofNanos(cumulativeExcessRetained); | ||
| } | ||
|
|
||
| /** | ||
| * A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to | ||
| * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture | ||
| * that is older than the policy's allowed retention (if any). | ||
| * The implementation is similar to a singly-linked list whose youngest captures are at | ||
| * the tail and oldest captures are at the head, with an additional pre-tail stage. | ||
| * Compaction is always done at read-time and occasionally at write-time. | ||
| * Both reads and writes are non-blocking and concurrency-safe. | ||
| */ | ||
| private static class RetentionWindow { | ||
| private final AtomicReference<FlowCapture> stagedCapture = new AtomicReference<>(); | ||
| private final AtomicReference<Node> tail; | ||
| private final AtomicReference<Node> head; | ||
| private final FlowMetricRetentionPolicy policy; | ||
|
|
||
| RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) { | ||
| this.policy = policy; | ||
| final Node zeroNode = new Node(zeroCapture); | ||
| this.head = new AtomicReference<>(zeroNode); | ||
| this.tail = new AtomicReference<>(zeroNode); | ||
| } | ||
|
|
||
| /** | ||
| * Append the newest {@link FlowCapture} into this {@link RetentionWindow}, | ||
| * while respecting our {@link FlowMetricRetentionPolicy}. | ||
| * We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but | ||
| * expect callers of this method to minimize lag between instantiating the capture | ||
| * and appending it. | ||
| * | ||
| * @param newestCapture the newest capture to stage | ||
| */ | ||
| private void append(final FlowCapture newestCapture) { | ||
| final Node casTail = this.tail.getAcquire(); // for CAS | ||
| final long newestCaptureNanoTime = newestCapture.nanoTime(); | ||
|
|
||
| // stage our newest capture unless it is older than the currently-staged capture | ||
| final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture); | ||
|
|
||
| // promote our previously-staged capture IFF our newest capture is too far | ||
| // ahead of the current tail to support policy's resolution. | ||
| if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { | ||
| // attempt to set an _unlinked_ Node to our tail | ||
| final Node proposedNode = new Node(previouslyStaged); | ||
| if (this.tail.compareAndSet(casTail, proposedNode)) { | ||
| // if we succeeded at setting an unlinked node, link to it from our old tail | ||
| casTail.setNext(proposedNode); | ||
|
|
||
| // perform a force-compaction of our head if necessary, | ||
| // detected using plain memory access | ||
| final Node currentHead = head.getPlain(); | ||
| final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); | ||
| if (LOGGER.isTraceEnabled()) { | ||
| LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); | ||
| } | ||
| if (headAgeNanos > policy.forceCompactionNanos()) { | ||
| final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); | ||
| if (LOGGER.isDebugEnabled()) { | ||
| final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); | ||
| LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "RetentionWindow{" + | ||
| "policy=" + policy.policyName() + | ||
| " id=" + System.identityHashCode(this) + | ||
| '}'; | ||
| } | ||
|
|
||
| /** | ||
| * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. | ||
| * @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older | ||
| * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one | ||
| * exists, and is otherwise empty. | ||
| */ | ||
| public Optional<FlowCapture> baseline(final long nanoTime) { | ||
| final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); | ||
| final Node head = compactHead(barrier); | ||
| if (head.captureNanoTime() <= barrier) { | ||
| return Optional.of(head.capture); | ||
| } else { | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @return a computationally-expensive estimate of the number of captures in this window, | ||
| * using plain memory access. This should NOT be run in unguarded production code. | ||
| */ | ||
| private static int estimateSize(final Node headNode) { | ||
| int i = 1; // assume we have one additional staged | ||
| // NOTE: we chase the provided headNode's tail with plain-gets, | ||
| // which tolerates missed appends from other threads. | ||
| for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; } | ||
| return i; | ||
| } | ||
|
|
||
| /** | ||
| * @see RetentionWindow#estimateSize(Node) | ||
| */ | ||
| private int estimateSize() { | ||
| return estimateSize(this.head.getPlain()); | ||
| } | ||
|
|
||
| /** | ||
| * @param barrier a nanoTime that will NOT be crossed during compaction | ||
| * @return the head node after compaction up to the provided barrier. | ||
| */ | ||
| private Node compactHead(final long barrier) { | ||
| return this.head.updateAndGet((existingHead) -> { | ||
| final Node proposedHead = existingHead.seekWithoutCrossing(barrier); | ||
| return Objects.requireNonNullElse(proposedHead, existingHead); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Internal testing support | ||
| */ | ||
| private long excessRetained(final long currentNanoTime, final ToLongFunction<FlowMetricRetentionPolicy> retentionWindowFunction) { | ||
| final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); | ||
| return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); | ||
| } | ||
|
|
||
| /** | ||
| * A {@link Node} holds a single {@link FlowCapture} and | ||
| * may link ahead to the next {@link Node}. | ||
| * It is an implementation detail of {@link RetentionWindow}. | ||
| */ | ||
| private static class Node { | ||
| private static final VarHandle NEXT; | ||
| static { | ||
| try { | ||
| MethodHandles.Lookup l = MethodHandles.lookup(); | ||
| NEXT = l.findVarHandle(Node.class, "next", Node.class); | ||
| } catch (ReflectiveOperationException e) { | ||
| throw new ExceptionInInitializerError(e); | ||
| } | ||
| } | ||
|
|
||
| private final FlowCapture capture; | ||
| private volatile Node next; | ||
|
|
||
| Node(final FlowCapture capture) { | ||
| this.capture = capture; | ||
| } | ||
|
|
||
| Node seekWithoutCrossing(final long barrier) { | ||
| Node newestOlderThanThreshold = null; | ||
| Node candidate = this; | ||
|
|
||
| while(candidate != null && candidate.captureNanoTime() < barrier) { | ||
| newestOlderThanThreshold = candidate; | ||
| candidate = candidate.getNext(); | ||
| } | ||
| return newestOlderThanThreshold; | ||
| } | ||
|
|
||
| long captureNanoTime() { | ||
| return this.capture.nanoTime(); | ||
| } | ||
|
|
||
| void setNext(final Node nextNode) { | ||
| next = nextNode; | ||
| } | ||
|
|
||
| Node getNext() { | ||
| return next; | ||
| } | ||
|
|
||
| Node getNextPlain() { | ||
| return (Node)NEXT.get(this); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.