diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index de201dc6679..a3c08c44b2a 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -242,6 +242,7 @@ dependencies { exclude group: 'com.google.guava', module: 'guava' } implementation 'org.javassist:javassist:3.30.2-GA' + implementation 'org.hdrhistogram:HdrHistogram:2.2.2' testImplementation "org.apache.logging.log4j:log4j-core:${log4jVersion}:tests" testImplementation 'org.hamcrest:hamcrest:2.2' testImplementation 'org.hamcrest:hamcrest-library:2.2' diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 0d8d34e66cf..239a32fe1de 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -203,10 +203,36 @@ def refine_batch_metrics(stats) result[:event_count][:average][window] = event_count_average_flow_metric[key]&.round if event_count_average_flow_metric[key] result[:byte_size][:average][window] = byte_size_average_flow_metric[key]&.round if byte_size_average_flow_metric[key] end + + if stats[:batch][:batch_byte_size] + # stats[:batch][:batch_byte_size] is an instance of org.logstash.instrument.metrics.HdrHistogramFlowMetric + # so need to call "value" to grab the map of sub-metrics which contains the histogram percentiles + # as org.logstash.instrument.metrics.HistogramMetricData + byte_size_histogram = stats[:batch][:batch_byte_size][:histogram] + [:last_1_minute, :last_5_minutes, :last_15_minutes].each do |window| + reshape_histogram_percentiles_for_window(:byte_size, byte_size_histogram, window, result) if byte_size_histogram.value[window.to_s] + end + end + if stats[:batch][:batch_event_count] + event_count_histogram = stats[:batch][:batch_event_count][:histogram] + [:last_1_minute, :last_5_minutes, :last_15_minutes].each do |window| + reshape_histogram_percentiles_for_window(:event_count, event_count_histogram, window, result) if event_count_histogram.value[window.to_s] + end + end result end private :refine_batch_metrics + def reshape_histogram_percentiles_for_window(target_field, histogram_metric, window, result) + result[target_field][:p50] = {} if result[target_field][:p50].nil? + result[target_field][:p90] = {} if result[target_field][:p90].nil? + + histogram_data = histogram_metric.value[window.to_s] + result[target_field][:p50][window] = histogram_data.get50Percentile.round + result[target_field][:p90][window] = histogram_data.get90Percentile.round + end + private :reshape_histogram_percentiles_for_window + def report(stats, extended_stats = nil, opts = {}) ret = { :events => stats[:events], diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index a91cdf5dedf..eb231b9b5ab 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -6,6 +6,8 @@ import org.logstash.ackedqueue.QueueFactoryExt; import org.logstash.ext.JrubyEventExtLibrary; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.instrument.metrics.HdrHistogramFlowMetric; +import org.logstash.instrument.metrics.HistogramFlowMetric; import org.logstash.instrument.metrics.counter.LongCounter; import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; @@ -23,6 +25,8 @@ class QueueReadClientBatchMetrics { private LongCounter pipelineMetricBatchCount; private LongCounter pipelineMetricBatchByteSize; private LongCounter pipelineMetricBatchTotalEvents; + private HistogramFlowMetric pipelineMetricBatchByteSizeFlowHistogram; + private HistogramFlowMetric pipelineMetricBatchEventCountFlowHistogram; private final SecureRandom random = new SecureRandom(); private LazyDelegatingGauge currentBatchDimensions; @@ -39,6 +43,12 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) { pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS); pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES); currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY); + pipelineMetricBatchByteSizeFlowHistogram = batchNamespace.asApiMetric() + .namespace("batch_byte_size") + .register("histogram", HdrHistogramFlowMetric.FACTORY); + pipelineMetricBatchEventCountFlowHistogram = batchNamespace.asApiMetric() + .namespace("batch_event_count") + .register("histogram", HdrHistogramFlowMetric.FACTORY); } } @@ -72,9 +82,12 @@ private void updateBatchSizeMetric(QueueBatch batch) { totalByteSize += rubyEvent.getEvent().estimateMemory(); } pipelineMetricBatchCount.increment(); - pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); + int batchEventsCount = batch.filteredSize(); + pipelineMetricBatchTotalEvents.increment(batchEventsCount); pipelineMetricBatchByteSize.increment(totalByteSize); - currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize)); + currentBatchDimensions.set(Arrays.asList(batchEventsCount, totalByteSize)); + pipelineMetricBatchByteSizeFlowHistogram.recordValue(totalByteSize); + pipelineMetricBatchEventCountFlowHistogram.recordValue(batchEventsCount); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java new file mode 100644 index 00000000000..921debab087 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +/** + * A {@link DatapointCapture} provides access to timing information + * captured at a specific point in time. + */ +abstract class DatapointCapture { + + private final long nanoTime; + + DatapointCapture(final long nanoTime) { + this.nanoTime = nanoTime; + } + + /** + * @return the nanoTime of this capture, as provided at time + * of capture by the {@link FlowMetric}. + */ + public long nanoTime() { + return nanoTime; + } + + DatapointCapture selectNewestCapture(final DatapointCapture proposed) { + if (proposed == null) { + return this; + } + + return (this.nanoTime() > proposed.nanoTime()) ? this : proposed; + } + + /** + * Internal tooling to select the younger of two captures + */ + static DatapointCapture selectNewerCapture(final DatapointCapture existing, final DatapointCapture proposed) { + if (existing == null) { + return proposed; + } + + return existing.selectNewestCapture(proposed); + } +} diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java index e93655f3d1f..7bc24f6a47f 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java @@ -23,8 +23,6 @@ import org.apache.logging.log4j.Logger; import org.logstash.util.SetOnceReference; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -32,10 +30,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.OptionalDouble; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -100,7 +96,7 @@ public Map getValue() { this.retentionWindows.get() .forEach(window -> window.baseline(currentCapture.nanoTime()) .or(() -> windowDefaultBaseline(window)) - .map((baseline) -> calculateRate(currentCapture, baseline)) + .map((baseline) -> calculateRate(currentCapture, (FlowCapture) baseline)) .orElseGet(OptionalDouble::empty) .ifPresent((rate) -> rates.put(window.policy.policyName(), rate))); @@ -132,16 +128,6 @@ private static void injectIntoRetentionWindows(final List reten retentionWindows.forEach((rw) -> rw.append(capture)); } - /** - * Internal tooling to select the younger of two captures - */ - private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) { - if (existing == null) { return proposed; } - if (proposed == null) { return existing; } - - return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed; - } - /** * If a window's policy allows it to report before its retention has been reached, * use our lifetime baseline as a default. @@ -180,183 +166,4 @@ Duration estimateExcessRetained(final ToLongFunction .sum(); return Duration.ofNanos(cumulativeExcessRetained); } - - /** - * A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to - * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture - * that is older than the policy's allowed retention (if any). - * The implementation is similar to a singly-linked list whose youngest captures are at - * the tail and oldest captures are at the head, with an additional pre-tail stage. - * Compaction is always done at read-time and occasionally at write-time. - * Both reads and writes are non-blocking and concurrency-safe. - */ - private static class RetentionWindow { - private final AtomicReference stagedCapture = new AtomicReference<>(); - private final AtomicReference tail; - private final AtomicReference head; - private final FlowMetricRetentionPolicy policy; - - RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) { - this.policy = policy; - final Node zeroNode = new Node(zeroCapture); - this.head = new AtomicReference<>(zeroNode); - this.tail = new AtomicReference<>(zeroNode); - } - - /** - * Append the newest {@link FlowCapture} into this {@link RetentionWindow}, - * while respecting our {@link FlowMetricRetentionPolicy}. - * We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but - * expect callers of this method to minimize lag between instantiating the capture - * and appending it. - * - * @param newestCapture the newest capture to stage - */ - private void append(final FlowCapture newestCapture) { - final Node casTail = this.tail.getAcquire(); // for CAS - final long newestCaptureNanoTime = newestCapture.nanoTime(); - - // stage our newest capture unless it is older than the currently-staged capture - final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture); - - // promote our previously-staged capture IFF our newest capture is too far - // ahead of the current tail to support policy's resolution. - if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { - // attempt to set an _unlinked_ Node to our tail - final Node proposedNode = new Node(previouslyStaged); - if (this.tail.compareAndSet(casTail, proposedNode)) { - // if we succeeded at setting an unlinked node, link to it from our old tail - casTail.setNext(proposedNode); - - // perform a force-compaction of our head if necessary, - // detected using plain memory access - final Node currentHead = head.getPlain(); - final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); - } - if (headAgeNanos > policy.forceCompactionNanos()) { - final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); - if (LOGGER.isDebugEnabled()) { - final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); - LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); - } - } - } - } - } - - @Override - public String toString() { - return "RetentionWindow{" + - "policy=" + policy.policyName() + - " id=" + System.identityHashCode(this) + - '}'; - } - - /** - * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. - * @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older - * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one - * exists, and is otherwise empty. - */ - public Optional baseline(final long nanoTime) { - final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); - final Node head = compactHead(barrier); - if (head.captureNanoTime() <= barrier) { - return Optional.of(head.capture); - } else { - return Optional.empty(); - } - } - - /** - * @return a computationally-expensive estimate of the number of captures in this window, - * using plain memory access. This should NOT be run in unguarded production code. - */ - private static int estimateSize(final Node headNode) { - int i = 1; // assume we have one additional staged - // NOTE: we chase the provided headNode's tail with plain-gets, - // which tolerates missed appends from other threads. - for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; } - return i; - } - - /** - * @see RetentionWindow#estimateSize(Node) - */ - private int estimateSize() { - return estimateSize(this.head.getPlain()); - } - - /** - * @param barrier a nanoTime that will NOT be crossed during compaction - * @return the head node after compaction up to the provided barrier. - */ - private Node compactHead(final long barrier) { - return this.head.updateAndGet((existingHead) -> { - final Node proposedHead = existingHead.seekWithoutCrossing(barrier); - return Objects.requireNonNullElse(proposedHead, existingHead); - }); - } - - /** - * Internal testing support - */ - private long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { - final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); - return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); - } - - /** - * A {@link Node} holds a single {@link FlowCapture} and - * may link ahead to the next {@link Node}. - * It is an implementation detail of {@link RetentionWindow}. - */ - private static class Node { - private static final VarHandle NEXT; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - - private final FlowCapture capture; - private volatile Node next; - - Node(final FlowCapture capture) { - this.capture = capture; - } - - Node seekWithoutCrossing(final long barrier) { - Node newestOlderThanThreshold = null; - Node candidate = this; - - while(candidate != null && candidate.captureNanoTime() < barrier) { - newestOlderThanThreshold = candidate; - candidate = candidate.getNext(); - } - return newestOlderThanThreshold; - } - - long captureNanoTime() { - return this.capture.nanoTime(); - } - - void setNext(final Node nextNode) { - next = nextNode; - } - - Node getNext() { - return next; - } - - Node getNextPlain() { - return (Node)NEXT.get(this); - } - } - } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java index 3b8444cd02b..d8bc5303bd7 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java @@ -26,26 +26,16 @@ * point-in-time data for a pair of {@link Metric}s. * It is immutable. */ -class FlowCapture { +class FlowCapture extends DatapointCapture { private final Number numerator; private final Number denominator; - private final long nanoTime; - FlowCapture(final long nanoTime, final Number numerator, final Number denominator) { + super(nanoTime); this.numerator = numerator; this.denominator = denominator; - this.nanoTime = nanoTime; - } - - /** - * @return the nanoTime of this capture, as provided at time - * of capture by the {@link FlowMetric}. - */ - public long nanoTime() { - return nanoTime; } /** @@ -65,7 +55,7 @@ public BigDecimal denominator() { @Override public String toString() { return getClass().getSimpleName() +"{" + - "nanoTimestamp=" + nanoTime + + "nanoTimestamp=" + nanoTime() + " numerator=" + numerator() + " denominator=" + denominator() + '}'; diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java index e36fba4b61d..a9d504d6be0 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetricRetentionPolicy.java @@ -32,6 +32,8 @@ interface FlowMetricRetentionPolicy { boolean reportBeforeSatisfied(); + long samplesCount(); + enum BuiltInRetentionPolicy implements FlowMetricRetentionPolicy { // MAX_RETENTION, MIN_RESOLUTION CURRENT(Duration.ofSeconds(10), Duration.ofSeconds(1), true), @@ -95,5 +97,10 @@ public long forceCompactionNanos() { public boolean reportBeforeSatisfied() { return this.reportBeforeSatisfied; } + + @Override + public long samplesCount() { + return retentionNanos / resolutionNanos; + } } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/HdrHistogramFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/HdrHistogramFlowMetric.java new file mode 100644 index 00000000000..3db06652d37 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/HdrHistogramFlowMetric.java @@ -0,0 +1,183 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +import co.elastic.logstash.api.UserMetric; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; + +public class HdrHistogramFlowMetric extends AbstractMetric> implements HistogramFlowMetric { + + /** + * Support class to hold a window of histogram snapshots and histogram recorder. + * */ + private static final class HistogramSnapshotsWindow { + private final FlowMetricRetentionPolicy retentionPolicy; + private final LongSupplier nanoTimeSupplier; + private final Recorder histogramRecorder; + private final AtomicLong lastRecordTimeNanos; + private final RetentionWindow recordWindow; + final long estimatedSizeInBytes; + + HistogramSnapshotsWindow(FlowMetricRetentionPolicy retentionPolicy, LongSupplier nanoTimeSupplier) { + this.retentionPolicy = retentionPolicy; + this.nanoTimeSupplier = nanoTimeSupplier; + this.histogramRecorder = new Recorder(1_000_000, 3); + long actualTime = nanoTimeSupplier.getAsLong(); + lastRecordTimeNanos = new AtomicLong(actualTime); + HistogramCapture initialCapture = new HistogramCapture(histogramRecorder.getIntervalHistogram(), actualTime); + recordWindow = new RetentionWindow(retentionPolicy, initialCapture); + estimatedSizeInBytes = initEstimatedSize(); + } + + private long initEstimatedSize() { + Histogram snapshotHistogram = histogramRecorder.getIntervalHistogram(); + int estimatedFootprintInBytes = snapshotHistogram.getEstimatedFootprintInBytes(); + + return recordWindow.policy.samplesCount() * estimatedFootprintInBytes; + } + + void recordValue(long totalByteSize) { + histogramRecorder.recordValue(totalByteSize); + + // Record on every call and create a snapshot iff we pass the flow metric policy resolution time + long currentTimeNanos = nanoTimeSupplier.getAsLong(); + long updatedLast = lastRecordTimeNanos.accumulateAndGet(currentTimeNanos, (last, current) -> { + if (current - last > retentionPolicy.resolutionNanos()) { + return current; + } else { + return last; + } + }); + + // If two threads race to update lastRecordTimeNanos, only one updates the variable. If two threads read + // currentTimeNanos that are different but really close to the other, then only one succeeds in updating the atomic long, + // and will satisfy updatedLast == currentTimeNanos, so will create the snapshot. The other will read a different + // updatedLast and the condition will fail. + // If two threads read exactly the same nanosecond time, also in this case only one will update the atomic long, but both + // will satisfy the condition and create two histogram snapshots, one of which will be probably empty. + // In this case the recordWindow.append prefer the newest, so in case of two snapshots created at the same time, + // the latter will be kept. Depending on the interleaving of the threads it could be that the empty one is kept. + // Is this a problem? Probably not, because it contributes to the calculation of the percentiles for a fraction, and + // given that it's a statistical approximation, doesn't really matter having exact numbers. + if (updatedLast == currentTimeNanos) { + // an update of the lastRecordTimeNanos happened, we need to create a snapshot + Histogram snapshotHistogram = histogramRecorder.getIntervalHistogram(); + LOG.debug("Capturing new histogram snapshot p50: {}, p90: {}", + snapshotHistogram.getValueAtPercentile(50), snapshotHistogram.getValueAtPercentile(90)); + recordWindow.append(new HistogramCapture(snapshotHistogram, currentTimeNanos)); + } + } + + HistogramMetricData computeAggregatedHistogramData() { + final Histogram windowAggregated = new Histogram(1_000_000, 3); + final long currentTimeNanos = nanoTimeSupplier.getAsLong(); + recordWindow.forEachCapture(dpc -> { + // When all captures fall outside of the retention window, the list still holds the last capture, + // so have to check retention here again + if ((currentTimeNanos - dpc.nanoTime()) > retentionPolicy.retentionNanos()) { + // skip captures outside of retention window + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping capture outside of retention window {}, expired {} seconds ago", + retentionPolicy.policyName(), + Duration.ofNanos((currentTimeNanos - dpc.nanoTime()) - retentionPolicy.retentionNanos()).toSeconds()); + } + return; + } + + if (dpc instanceof HistogramCapture hdp) { + windowAggregated.add(hdp.getHdrHistogram()); + } else { + LOG.warn("Found {} which is not a HistogramCapture in HdrHistogramFlowMetric retention window", + dpc.getClass().getName()); + } + }); + if (LOG.isTraceEnabled()) { + LOG.trace("Captures held for policy {}, estimated size: {}", retentionPolicy, recordWindow.countCaptures()); + } + return new HistogramMetricData(windowAggregated); + } + } + + private static final Logger LOG = LogManager.getLogger(HdrHistogramFlowMetric.class); + + public static UserMetric.Factory FACTORY = HistogramFlowMetric.PROVIDER.getFactory(HdrHistogramFlowMetric::new); + + private static final List SUPPORTED_POLICIES = List.of( + FlowMetricRetentionPolicy.BuiltInRetentionPolicy.LAST_1_MINUTE, + FlowMetricRetentionPolicy.BuiltInRetentionPolicy.LAST_5_MINUTES, + FlowMetricRetentionPolicy.BuiltInRetentionPolicy.LAST_15_MINUTES + ); + private final ConcurrentMap histogramsWindows = new ConcurrentHashMap<>(); + private final LongSupplier nanoTimeSupplier; + + /** + * Constructor + * + * @param name The name of this metric. This value may be used for display purposes. + */ + protected HdrHistogramFlowMetric(String name) { + this(name, System::nanoTime); + } + + // Used in tests + protected HdrHistogramFlowMetric(String name, LongSupplier nanoTimeSupplier) { + super(name); + this.nanoTimeSupplier = nanoTimeSupplier; + long totalEstimatedSize = 0L; + for (FlowMetricRetentionPolicy policy : SUPPORTED_POLICIES) { + HistogramSnapshotsWindow snapshotsWindow = new HistogramSnapshotsWindow(policy, nanoTimeSupplier); + totalEstimatedSize += snapshotsWindow.estimatedSizeInBytes; + histogramsWindows.put(policy, snapshotsWindow); + } + + LOG.info("Estimated memory footprint for histogram metric is approximately {} bytes", totalEstimatedSize); + } + + @Override + public Map getValue() { + final Map result = new HashMap<>(); + final long currentTimeNanos = nanoTimeSupplier.getAsLong(); + histogramsWindows.forEach((policy, window) -> { + window.recordWindow.baseline(currentTimeNanos).ifPresent(baseline -> { + result.put(policy.policyName().toLowerCase(), window.computeAggregatedHistogramData()); + }); + }); + return result; + } + + @Override + public void recordValue(long totalByteSize) { + for (FlowMetricRetentionPolicy policy : SUPPORTED_POLICIES) { + histogramsWindows.get(policy).recordValue(totalByteSize); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramCapture.java new file mode 100644 index 00000000000..711fc753e9c --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramCapture.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +import org.HdrHistogram.Histogram; + +public class HistogramCapture extends DatapointCapture { + + private final Histogram hdrHistogram; + + public HistogramCapture(Histogram histogram, long nanoTime) { + super(nanoTime); + this.hdrHistogram = histogram; + } + + public Histogram getHdrHistogram() { + return hdrHistogram; + } +} diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramFlowMetric.java new file mode 100644 index 00000000000..23851ccff7a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramFlowMetric.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.instrument.metrics; + +import co.elastic.logstash.api.UserMetric; + +import java.util.Map; + +public interface HistogramFlowMetric extends UserMetric>, + org.logstash.instrument.metrics.Metric> { + + Provider PROVIDER = new Provider<>(HistogramFlowMetric.class, new HistogramFlowMetric() { + @Override + public Map getValue() { + return Map.of(); + } + + @Override + public String getName() { + return "NULL"; + } + + @Override + public void recordValue(long totalByteSize) { + // no-op + } + }); + + void recordValue(long totalByteSize); + + @Override + default MetricType getType() { + return MetricType.USER; + } +} \ No newline at end of file diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramMetricData.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramMetricData.java new file mode 100644 index 00000000000..d12ce1a5b4b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/HistogramMetricData.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.HdrHistogram.Histogram; + +import java.io.Serial; +import java.io.Serializable; + +public class HistogramMetricData implements Serializable { + @Serial + private static final long serialVersionUID = 4711735381843512566L; + private final long percentile50; + private final long percentile90; + + public HistogramMetricData(Histogram hdrHistogram) { + percentile50 = hdrHistogram.getValueAtPercentile(50); + percentile90 = hdrHistogram.getValueAtPercentile(90); + } + + @JsonProperty("p50") + public double get50Percentile() { + return percentile50; + } + + @JsonProperty("p90") + public double get90Percentile() { + return percentile90; + } + + @Override + public String toString() { + return "HistogramMetricData{" + + "percentile50=" + percentile50 + + ", percentile90=" + percentile90 + + '}'; + } +} \ No newline at end of file diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java new file mode 100644 index 00000000000..ef4ad5527f2 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -0,0 +1,208 @@ +package org.logstash.instrument.metrics; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.ToLongFunction; + +/** + * A {@link RetentionWindow} efficiently holds sufficient {@link DatapointCapture}s to + * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture + * that is older than the policy's allowed retention (if any). + * The implementation is similar to a singly-linked list whose youngest captures are at + * the tail and oldest captures are at the head, with an additional pre-tail stage. + * Compaction is always done at read-time and occasionally at write-time. + * Both reads and writes are non-blocking and concurrency-safe. + */ +class RetentionWindow { + private final AtomicReference stagedCapture = new AtomicReference<>(); + private final AtomicReference tail; + private final AtomicReference head; + final FlowMetricRetentionPolicy policy; + + RetentionWindow(final FlowMetricRetentionPolicy policy, final DatapointCapture zeroCapture) { + this.policy = policy; + final Node zeroNode = new Node(zeroCapture); + this.head = new AtomicReference<>(zeroNode); + this.tail = new AtomicReference<>(zeroNode); + } + + /** + * Append the newest {@link DatapointCapture} into this {@link RetentionWindow}, + * while respecting our {@link FlowMetricRetentionPolicy}. + * We tolerate minor jitter in the provided {@link DatapointCapture#nanoTime()}, but + * expect callers of this method to minimize lag between instantiating the capture + * and appending it. + * + * @param newestCapture the newest capture to stage + */ + void append(final DatapointCapture newestCapture) { + final Node casTail = this.tail.getAcquire(); // for CAS + final long newestCaptureNanoTime = newestCapture.nanoTime(); + + // stage our newest capture unless it is older than the currently-staged capture + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewerCapture); + + // promote our previously-staged capture IFF our newest capture is too far + // ahead of the current tail to support policy's resolution. + if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { + // attempt to set an _unlinked_ Node to our tail + final Node proposedNode = new Node(previouslyStaged); + if (this.tail.compareAndSet(casTail, proposedNode)) { + // if we succeeded at setting an unlinked node, link to it from our old tail + casTail.setNext(proposedNode); + + // perform a force-compaction of our head if necessary, + // detected using plain memory access + final Node currentHead = head.getPlain(); + final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); + if (ExtendedFlowMetric.LOGGER.isTraceEnabled()) { + ExtendedFlowMetric.LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); + } + if (headAgeNanos > policy.forceCompactionNanos()) { + final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); + if (ExtendedFlowMetric.LOGGER.isDebugEnabled()) { + final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); + ExtendedFlowMetric.LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); + } + } + } + } + } + + /** + * Iterate over all retained captures in this window, in order from oldest to newest. + * Used by client code to compute aggregate statistics. + * */ + protected void forEachCapture(final Consumer consumer) { + Node current = this.head.getPlain(); + while (current != null) { + consumer.accept(current.capture); + current = current.getNext(); + } + } + + @Override + public String toString() { + return "RetentionWindow{" + + "policy=" + policy.policyName() + + " id=" + System.identityHashCode(this) + + '}'; + } + + /** + * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. + * @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older + * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one + * exists, and is otherwise empty. + */ + public Optional baseline(final long nanoTime) { + final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); + final Node head = compactHead(barrier); + if (head.captureNanoTime() <= barrier) { + return Optional.of(head.capture); + } else { + return Optional.empty(); + } + } + + /** + * @return a computationally-expensive estimate of the number of captures in this window, + * using plain memory access. This should NOT be run in unguarded production code. + */ + private static int estimateSize(final Node headNode) { + int i = 1; // assume we have one additional staged + // NOTE: we chase the provided headNode's tail with plain-gets, + // which tolerates missed appends from other threads. + for (Node current = headNode; current != null; current = current.getNextPlain()) { + i++; + } + return i; + } + + /** + * @see RetentionWindow#estimateSize(Node) + */ + int estimateSize() { + return estimateSize(this.head.getPlain()); + } + + int countCaptures() { + return estimateSize(); + } + + /** + * @param barrier a nanoTime that will NOT be crossed during compaction + * @return the head node after compaction up to the provided barrier. + */ + private Node compactHead(final long barrier) { + return this.head.updateAndGet((existingHead) -> { + final Node proposedHead = existingHead.seekWithoutCrossing(barrier); + return Objects.requireNonNullElse(proposedHead, existingHead); + }); + } + + /** + * Internal testing support + */ + long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { + final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); + return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); + } + + /** + * A {@link Node} holds a single {@link DatapointCapture} and + * may link ahead to the next {@link Node}. + * It is an implementation detail of {@link RetentionWindow}. + */ + private static class Node { + private static final VarHandle NEXT; + + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + NEXT = l.findVarHandle(Node.class, "next", Node.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final DatapointCapture capture; + private volatile Node next; + + Node(final DatapointCapture capture) { + this.capture = capture; + } + + Node seekWithoutCrossing(final long barrier) { + Node newestOlderThanThreshold = null; + Node candidate = this; + + while (candidate != null && candidate.captureNanoTime() < barrier) { + newestOlderThanThreshold = candidate; + candidate = candidate.getNext(); + } + return newestOlderThanThreshold; + } + + long captureNanoTime() { + return this.capture.nanoTime(); + } + + void setNext(final Node nextNode) { + next = nextNode; + } + + Node getNext() { + return next; + } + + Node getNextPlain() { + return (Node) NEXT.get(this); + } + } +} diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/HdrHistogramFlowMetricTest.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/HdrHistogramFlowMetricTest.java new file mode 100644 index 00000000000..cdb5363ad1e --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/HdrHistogramFlowMetricTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +import org.HdrHistogram.Histogram; +import org.junit.Before; +import org.junit.Test; +import org.logstash.testutils.time.ManualAdvanceClock; + +import java.security.SecureRandom; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +public class HdrHistogramFlowMetricTest { + + private final SecureRandom random = new SecureRandom(); + + private ManualAdvanceClock clock; + private HdrHistogramFlowMetric sut; + + @Before + public void setUp() { + clock = new ManualAdvanceClock(Instant.now()); + sut = new HdrHistogramFlowMetric("histogram", clock::nanoTime); + } + + @Test + public void givenMostlyStaticValues_whenRecordingValues_thenHistogramsReflectThis() { + Histogram referenceHistogram = new Histogram(1_000_000, 3); + + // Record values for 60 seconds + for (int i = 0; i < 45; i++) { + sut.recordValue(100); + referenceHistogram.recordValue(100); + clock.advance(Duration.ofSeconds(1)); + } + for (int i = 0; i < 15; i++) { + sut.recordValue(200); + referenceHistogram.recordValue(200); + clock.advance(Duration.ofSeconds(1)); + } + + // Retrieve histogram values + Map histogramMap = sut.getValue(); + System.out.println(histogramMap); + + assertThat("contains just the last 1 minute histogram", histogramMap, + allOf(aMapWithSize(1), hasKey("last_1_minute"))); + + // Check against the reference histogram + HistogramMetricData last1MinuteData = histogramMap.get("last_1_minute"); + assertEquals(referenceHistogram.getValueAtPercentile(50), last1MinuteData.get50Percentile(), 0.1); + assertEquals(referenceHistogram.getValueAtPercentile(90), last1MinuteData.get90Percentile(), 0.1); + } + + + @Test + public void givenRunningMetricForMoreMinutesThenHistogramsHasToReflectTimeAndValues() { + // Record values for 4 minutes, recording low values, 80% of the time 100, 20% of the time 200 + for (int i = 0; i < 4 * 60; i++) { + if (random.nextInt(100) < 80) { + sut.recordValue(100); + } else { + sut.recordValue(200); + } + clock.advance(Duration.ofSeconds(1)); + } + + // Then for 1 minute record a spike + for (int i = 0; i < 60; i++) { + if (random.nextInt(100) < 80) { + sut.recordValue(1000); + } else { + sut.recordValue(1500); + } + clock.advance(Duration.ofSeconds(1)); + } + + // Retrieve histogram values and verify values for the time windows + Map histogramMap = sut.getValue(); + + assertThat("contains just last 1 minute and 5 minutes histograms", histogramMap, + allOf(aMapWithSize(2), hasKey("last_1_minute"), hasKey("last_5_minutes"))); + + // Since values are uniformly distributed, we can check expected percentiles + HistogramMetricData last1MinuteData = histogramMap.get("last_1_minute"); + assertEquals(1000, last1MinuteData.get50Percentile(), 10); + assertEquals(1500, last1MinuteData.get90Percentile(), 10); + HistogramMetricData last5MinutesData = histogramMap.get("last_5_minutes"); + assertEquals(100, last5MinutesData.get50Percentile(), 10); + assertEquals(1000, last5MinutesData.get90Percentile(), 10); + } + + @Test + public void givenRunningMetricWhenNoDataComesInForLastMinuteThenHistogramReflectsThisDrop() { + // Record values for 4 minutes, recording low values, 80% of the time 100, 20% of the time 200 + for (int i = 0; i < 4 * 60; i++) { + if (random.nextInt(100) < 80) { + sut.recordValue(100); + } else { + sut.recordValue(200); + } + clock.advance(Duration.ofSeconds(1)); + } + + // Then for 1 minute record no values at all + clock.advance(Duration.ofSeconds(60)); + + // Retrieve histogram values and verify values for the time windows + Map histogramMap = sut.getValue(); + + assertThat("contains just last 1 minute and 5 minutes histograms", histogramMap, + allOf(aMapWithSize(2), hasKey("last_1_minute"), hasKey("last_5_minutes"))); + + // Since values are uniformly distributed, we can check expected percentiles + HistogramMetricData last1MinuteData = histogramMap.get("last_1_minute"); + assertEquals(0, last1MinuteData.get50Percentile(), 10); + assertEquals(0, last1MinuteData.get90Percentile(), 10); + HistogramMetricData last5MinutesData = histogramMap.get("last_5_minutes"); + assertEquals(100, last5MinutesData.get50Percentile(), 10); + assertEquals(200, last5MinutesData.get90Percentile(), 10); + } +} \ No newline at end of file diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv index 6133921b5fa..936629ab930 100644 --- a/tools/dependencies-report/src/main/resources/licenseMapping.csv +++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv @@ -169,6 +169,7 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "org.eclipse.jdt:org.eclipse.jdt.core:",http://www.eclipse.org/jdt,EPL-1.0 "org.eclipse.osgi:org.eclipse.osgi:",http://www.eclipse.org/jdt,EPL-1.0 "org.eclipse.text:org.eclipse.text:",http://www.eclipse.org/jdt,EPL-1.0 +"org.hdrhistogram:HdrHistogram:",https://github.com/HdrHistogram/HdrHistogram,BSD-2-Clause "org.javassist:javassist:",https://github.com/jboss-javassist/javassist,Apache-2.0 "org.jruby:jruby-core:",http://jruby.org/,EPL-2.0 "org.logstash:jvm-options-parser:",http://github.com/elastic/logstash,Apache-2.0 diff --git a/tools/dependencies-report/src/main/resources/notices/org.hdrhistogram!HdrHistogram-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/org.hdrhistogram!HdrHistogram-NOTICE.txt new file mode 100644 index 00000000000..ce8e7c0ea14 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/org.hdrhistogram!HdrHistogram-NOTICE.txt @@ -0,0 +1,41 @@ +The code in this repository code was Written by Gil Tene, Michael Barker, +and Matt Warren, and released to the public domain, as explained at +http://creativecommons.org/publicdomain/zero/1.0/ + +For users of this code who wish to consume it under the "BSD" license +rather than under the public domain or CC0 contribution text mentioned +above, the code found under this directory is *also* provided under the +following license (commonly referred to as the BSD 2-Clause License). This +license does not detract from the above stated release of the code into +the public domain, and simply represents an additional license granted by +the Author. + +----------------------------------------------------------------------------- +** Beginning of "BSD 2-Clause License" text. ** + + Copyright (c) 2012, 2013, 2014, 2015, 2016 Gil Tene + Copyright (c) 2014 Michael Barker + Copyright (c) 2014 Matt Warren + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file