Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ dependencies {
exclude group: 'com.google.guava', module: 'guava'
}
implementation 'org.javassist:javassist:3.30.2-GA'
implementation 'org.hdrhistogram:HdrHistogram:2.2.2'
testImplementation "org.apache.logging.log4j:log4j-core:${log4jVersion}:tests"
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.hamcrest:hamcrest-library:2.2'
Expand Down
26 changes: 26 additions & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,36 @@ def refine_batch_metrics(stats)
result[:event_count][:average][window] = event_count_average_flow_metric[key]&.round if event_count_average_flow_metric[key]
result[:byte_size][:average][window] = byte_size_average_flow_metric[key]&.round if byte_size_average_flow_metric[key]
end

if stats[:batch][:batch_byte_size]
# stats[:batch][:batch_byte_size] is an instance of org.logstash.instrument.metrics.HdrHistogramFlowMetric
# so need to call "value" to grab the map of sub-metrics which contains the histogram percentiles
# as org.logstash.instrument.metrics.HistogramMetricData
byte_size_histogram = stats[:batch][:batch_byte_size][:histogram]
[:last_1_minute, :last_5_minutes, :last_15_minutes].each do |window|
reshape_histogram_percentiles_for_window(:byte_size, byte_size_histogram, window, result) if byte_size_histogram.value[window.to_s]
end
end
if stats[:batch][:batch_event_count]
event_count_histogram = stats[:batch][:batch_event_count][:histogram]
[:last_1_minute, :last_5_minutes, :last_15_minutes].each do |window|
reshape_histogram_percentiles_for_window(:event_count, event_count_histogram, window, result) if event_count_histogram.value[window.to_s]
end
end
result
end
private :refine_batch_metrics

def reshape_histogram_percentiles_for_window(target_field, histogram_metric, window, result)
result[target_field][:p50] = {} if result[target_field][:p50].nil?
result[target_field][:p90] = {} if result[target_field][:p90].nil?

histogram_data = histogram_metric.value[window.to_s]
result[target_field][:p50][window] = histogram_data.get50Percentile.round
result[target_field][:p90][window] = histogram_data.get90Percentile.round
end
private :reshape_histogram_percentiles_for_window

def report(stats, extended_stats = nil, opts = {})
ret = {
:events => stats[:events],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.HdrHistogramFlowMetric;
import org.logstash.instrument.metrics.HistogramFlowMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;

Expand All @@ -23,6 +25,8 @@ class QueueReadClientBatchMetrics {
private LongCounter pipelineMetricBatchCount;
private LongCounter pipelineMetricBatchByteSize;
private LongCounter pipelineMetricBatchTotalEvents;
private HistogramFlowMetric pipelineMetricBatchByteSizeFlowHistogram;
private HistogramFlowMetric pipelineMetricBatchEventCountFlowHistogram;
private final SecureRandom random = new SecureRandom();
private LazyDelegatingGauge currentBatchDimensions;

Expand All @@ -39,6 +43,12 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY);
pipelineMetricBatchByteSizeFlowHistogram = batchNamespace.asApiMetric()
.namespace("batch_byte_size")
.register("histogram", HdrHistogramFlowMetric.FACTORY);
pipelineMetricBatchEventCountFlowHistogram = batchNamespace.asApiMetric()
.namespace("batch_event_count")
.register("histogram", HdrHistogramFlowMetric.FACTORY);
}
}

Expand Down Expand Up @@ -72,9 +82,12 @@ private void updateBatchSizeMetric(QueueBatch batch) {
totalByteSize += rubyEvent.getEvent().estimateMemory();
}
pipelineMetricBatchCount.increment();
pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
int batchEventsCount = batch.filteredSize();
pipelineMetricBatchTotalEvents.increment(batchEventsCount);
pipelineMetricBatchByteSize.increment(totalByteSize);
currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize));
currentBatchDimensions.set(Arrays.asList(batchEventsCount, totalByteSize));
pipelineMetricBatchByteSizeFlowHistogram.recordValue(totalByteSize);
pipelineMetricBatchEventCountFlowHistogram.recordValue(batchEventsCount);
} catch (IllegalArgumentException e) {
LOG.error("Failed to calculate batch byte size for metrics", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.instrument.metrics;

/**
* A {@link DatapointCapture} provides access to timing information
* captured at a specific point in time.
*/
abstract class DatapointCapture {

private final long nanoTime;

DatapointCapture(final long nanoTime) {
this.nanoTime = nanoTime;
}

/**
* @return the nanoTime of this capture, as provided at time
* of capture by the {@link FlowMetric}.
*/
public long nanoTime() {
return nanoTime;
}

DatapointCapture selectNewestCapture(final DatapointCapture proposed) {
if (proposed == null) {
return this;
}

return (this.nanoTime() > proposed.nanoTime()) ? this : proposed;
}

/**
* Internal tooling to select the younger of two captures
*/
static DatapointCapture selectNewerCapture(final DatapointCapture existing, final DatapointCapture proposed) {
if (existing == null) {
return proposed;
}

return existing.selectNewestCapture(proposed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@
import org.apache.logging.log4j.Logger;
import org.logstash.util.SetOnceReference;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -100,7 +96,7 @@ public Map<String, Double> getValue() {
this.retentionWindows.get()
.forEach(window -> window.baseline(currentCapture.nanoTime())
.or(() -> windowDefaultBaseline(window))
.map((baseline) -> calculateRate(currentCapture, baseline))
.map((baseline) -> calculateRate(currentCapture, (FlowCapture) baseline))
.orElseGet(OptionalDouble::empty)
.ifPresent((rate) -> rates.put(window.policy.policyName(), rate)));

Expand Down Expand Up @@ -132,16 +128,6 @@ private static void injectIntoRetentionWindows(final List<RetentionWindow> reten
retentionWindows.forEach((rw) -> rw.append(capture));
}

/**
* Internal tooling to select the younger of two captures
*/
private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) {
if (existing == null) { return proposed; }
if (proposed == null) { return existing; }

return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed;
}

/**
* If a window's policy allows it to report before its retention has been reached,
* use our lifetime baseline as a default.
Expand Down Expand Up @@ -180,183 +166,4 @@ Duration estimateExcessRetained(final ToLongFunction<FlowMetricRetentionPolicy>
.sum();
return Duration.ofNanos(cumulativeExcessRetained);
}

/**
* A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to
* meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture
* that is older than the policy's allowed retention (if any).
* The implementation is similar to a singly-linked list whose youngest captures are at
* the tail and oldest captures are at the head, with an additional pre-tail stage.
* Compaction is always done at read-time and occasionally at write-time.
* Both reads and writes are non-blocking and concurrency-safe.
*/
private static class RetentionWindow {
private final AtomicReference<FlowCapture> stagedCapture = new AtomicReference<>();
private final AtomicReference<Node> tail;
private final AtomicReference<Node> head;
private final FlowMetricRetentionPolicy policy;

RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) {
this.policy = policy;
final Node zeroNode = new Node(zeroCapture);
this.head = new AtomicReference<>(zeroNode);
this.tail = new AtomicReference<>(zeroNode);
}

/**
* Append the newest {@link FlowCapture} into this {@link RetentionWindow},
* while respecting our {@link FlowMetricRetentionPolicy}.
* We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but
* expect callers of this method to minimize lag between instantiating the capture
* and appending it.
*
* @param newestCapture the newest capture to stage
*/
private void append(final FlowCapture newestCapture) {
final Node casTail = this.tail.getAcquire(); // for CAS
final long newestCaptureNanoTime = newestCapture.nanoTime();

// stage our newest capture unless it is older than the currently-staged capture
final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture);

// promote our previously-staged capture IFF our newest capture is too far
// ahead of the current tail to support policy's resolution.
if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) {
// attempt to set an _unlinked_ Node to our tail
final Node proposedNode = new Node(previouslyStaged);
if (this.tail.compareAndSet(casTail, proposedNode)) {
// if we succeeded at setting an unlinked node, link to it from our old tail
casTail.setNext(proposedNode);

// perform a force-compaction of our head if necessary,
// detected using plain memory access
final Node currentHead = head.getPlain();
final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos));
}
if (headAgeNanos > policy.forceCompactionNanos()) {
final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos()));
if (LOGGER.isDebugEnabled()) {
final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime());
LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos));
}
}
}
}
}

@Override
public String toString() {
return "RetentionWindow{" +
"policy=" + policy.policyName() +
" id=" + System.identityHashCode(this) +
'}';
}

/**
* @param nanoTime the nanoTime of the capture for which we are retrieving a baseline.
* @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older
* than this window's {@link FlowMetricRetentionPolicy} allowed retention if one
* exists, and is otherwise empty.
*/
public Optional<FlowCapture> baseline(final long nanoTime) {
final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos());
final Node head = compactHead(barrier);
if (head.captureNanoTime() <= barrier) {
return Optional.of(head.capture);
} else {
return Optional.empty();
}
}

/**
* @return a computationally-expensive estimate of the number of captures in this window,
* using plain memory access. This should NOT be run in unguarded production code.
*/
private static int estimateSize(final Node headNode) {
int i = 1; // assume we have one additional staged
// NOTE: we chase the provided headNode's tail with plain-gets,
// which tolerates missed appends from other threads.
for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; }
return i;
}

/**
* @see RetentionWindow#estimateSize(Node)
*/
private int estimateSize() {
return estimateSize(this.head.getPlain());
}

/**
* @param barrier a nanoTime that will NOT be crossed during compaction
* @return the head node after compaction up to the provided barrier.
*/
private Node compactHead(final long barrier) {
return this.head.updateAndGet((existingHead) -> {
final Node proposedHead = existingHead.seekWithoutCrossing(barrier);
return Objects.requireNonNullElse(proposedHead, existingHead);
});
}

/**
* Internal testing support
*/
private long excessRetained(final long currentNanoTime, final ToLongFunction<FlowMetricRetentionPolicy> retentionWindowFunction) {
final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy));
return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime()));
}

/**
* A {@link Node} holds a single {@link FlowCapture} and
* may link ahead to the next {@link Node}.
* It is an implementation detail of {@link RetentionWindow}.
*/
private static class Node {
private static final VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

private final FlowCapture capture;
private volatile Node next;

Node(final FlowCapture capture) {
this.capture = capture;
}

Node seekWithoutCrossing(final long barrier) {
Node newestOlderThanThreshold = null;
Node candidate = this;

while(candidate != null && candidate.captureNanoTime() < barrier) {
newestOlderThanThreshold = candidate;
candidate = candidate.getNext();
}
return newestOlderThanThreshold;
}

long captureNanoTime() {
return this.capture.nanoTime();
}

void setNext(final Node nextNode) {
next = nextNode;
}

Node getNext() {
return next;
}

Node getNextPlain() {
return (Node)NEXT.get(this);
}
}
}
}
Loading