From 15b57e14a6b32c055336dc2a46970350046d6788 Mon Sep 17 00:00:00 2001 From: Thomas Crowley Date: Tue, 12 Nov 2024 17:16:37 +1100 Subject: [PATCH] Add allow/deny predicates to MetricFilter --- src/clojure/io/factorhouse/kpow/agent.clj | 33 +++++--- .../io/factorhouse/kpow/MetricFilter.java | 83 +++++++++++++++++++ .../io/factorhouse/kpow/MetricsFilter.java | 40 --------- .../io/factorhouse/kpow/StreamsRegistry.java | 11 ++- src/java/io/operatr/kpow/StreamsRegistry.java | 13 ++- test/io/factorhouse/agent_test.clj | 4 +- 6 files changed, 123 insertions(+), 61 deletions(-) create mode 100644 src/java/io/factorhouse/kpow/MetricFilter.java delete mode 100644 src/java/io/factorhouse/kpow/MetricsFilter.java diff --git a/src/clojure/io/factorhouse/kpow/agent.clj b/src/clojure/io/factorhouse/kpow/agent.clj index dd7bd8d..a4a725c 100644 --- a/src/clojure/io/factorhouse/kpow/agent.clj +++ b/src/clojure/io/factorhouse/kpow/agent.clj @@ -4,13 +4,12 @@ [clojure.core.protocols :as p]) (:import (java.util UUID) (java.util.concurrent Executors TimeUnit ThreadFactory) - (java.util.function Predicate) (org.apache.kafka.clients.producer Producer ProducerRecord) (org.apache.kafka.common MetricName) (org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source TopologyDescription$Processor TopologyDescription$Sink) - (io.factorhouse.kpow MetricsFilter) + (io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria) (java.util.concurrent.atomic AtomicInteger))) (def kpow-snapshot-topic @@ -81,22 +80,32 @@ (str/split #"-StreamThread-") (first)))) +(defn apply-metric-filters + [^MetricName metric-name filters] + (reduce + (fn [acc ^MetricFilter$FilterCriteria filter-criteria] + (let [metric-filter-type (.getFilterType filter-criteria) + predicate (.getPredicate filter-criteria)] + (if (.test predicate metric-name) + (reduced + (case (.name metric-filter-type) + "ACCEPT" true + "DENY" false)) + acc))) + nil + filters)) + (defn numeric-metrics - [metrics ^MetricsFilter metrics-filter] - (let [filters (.getFilters metrics-filter) - filter-fn (if (seq filters) - (fn [{:keys [metric-name]}] - (some (fn [^Predicate predicate] - (.test predicate ^MetricName metric-name)) - filters)) - (constantly identity))] + [metrics ^MetricFilter metrics-filter] + (let [filters (.getFilters metrics-filter)] (into [] (comp (filter (comp number? :value)) (remove (fn [{:keys [value]}] (if (double? value) (Double/isNaN value) false))) - (map #(select-keys % [:name :tags :value])) - (filter filter-fn)) + (filter (fn [{:keys [metric-name]}] + (apply-metric-filters metric-name filters))) + (map #(select-keys % [:name :tags :value]))) metrics))) (defn snapshot-send diff --git a/src/java/io/factorhouse/kpow/MetricFilter.java b/src/java/io/factorhouse/kpow/MetricFilter.java new file mode 100644 index 0000000..58cefc2 --- /dev/null +++ b/src/java/io/factorhouse/kpow/MetricFilter.java @@ -0,0 +1,83 @@ +package io.factorhouse.kpow; + +import org.apache.kafka.common.MetricName; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; + +public class MetricFilter { + public enum FilterType { + ACCEPT, DENY + } + + public static class FilterCriteria { + private final Predicate predicate; + private final FilterType filterType; + + // Constructor to initialize both fields + private FilterCriteria(Predicate predicate, FilterType filterType) { + this.predicate = predicate; + this.filterType = filterType; + } + + public Predicate getPredicate() { + return predicate; + } + + public FilterType getFilterType() { + return filterType; + } + } + + private final List filters; + + public MetricFilter() { + this.filters = new ArrayList<>(); + } + + public List getFilters() { + return Collections.unmodifiableList(filters); + } + + public MetricFilter accept() { + Predicate acceptPredicate = (_filter) -> { return true; }; + FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter accept(Predicate acceptFilter) { + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter deny() { + Predicate denyFilter = (_filter) -> { return true; }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + public MetricFilter deny(Predicate denyFilter) { + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } + + public MetricFilter acceptNameStartsWith(String prefix) { + Predicate acceptFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT); + this.filters.add(criteria); + return this; + } + + public MetricFilter denyNameStartsWith(String prefix) { + Predicate denyFilter = (metricName) -> { return metricName.name().startsWith(prefix); }; + FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY); + this.filters.add(criteria); + return this; + } +} \ No newline at end of file diff --git a/src/java/io/factorhouse/kpow/MetricsFilter.java b/src/java/io/factorhouse/kpow/MetricsFilter.java deleted file mode 100644 index 9660d3d..0000000 --- a/src/java/io/factorhouse/kpow/MetricsFilter.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.factorhouse.kpow; - -import org.apache.kafka.common.MetricName; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.Predicate; - -public class MetricsFilter { - private final List> filters; - - private MetricsFilter(List> filters) { - this.filters = new ArrayList<>(filters); - } - - public List> getFilters() { - return Collections.unmodifiableList(filters); - } - - public static Builder defaultMetricsFilter() { - return new Builder().addFilter(metricName -> "streams.state".equals(metricName.name())); - } - - public static Builder emptyMetricsFilter() { - return new Builder(); - } - - public static class Builder { - private final List> filters = new ArrayList<>(); - - public Builder addFilter(Predicate filter) { - filters.add(filter); - return this; - } - - public MetricsFilter build() { - return new MetricsFilter(filters); - } - } -} \ No newline at end of file diff --git a/src/java/io/factorhouse/kpow/StreamsRegistry.java b/src/java/io/factorhouse/kpow/StreamsRegistry.java index 6d59b8e..9a910e4 100644 --- a/src/java/io/factorhouse/kpow/StreamsRegistry.java +++ b/src/java/io/factorhouse/kpow/StreamsRegistry.java @@ -3,7 +3,6 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -74,7 +73,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { + public StreamsRegistry(Properties props, MetricFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -87,8 +86,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } + public static MetricFilter defaultMetricFilter() { + return new MetricFilter() + .acceptNameStartsWith("foo") + .deny(); + } + public StreamsRegistry(Properties props) { - this(props, MetricsFilter.defaultMetricsFilter().build()); + this(props, StreamsRegistry.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/src/java/io/operatr/kpow/StreamsRegistry.java b/src/java/io/operatr/kpow/StreamsRegistry.java index fd71562..cb9b75f 100644 --- a/src/java/io/operatr/kpow/StreamsRegistry.java +++ b/src/java/io/operatr/kpow/StreamsRegistry.java @@ -2,9 +2,8 @@ import clojure.java.api.Clojure; import clojure.lang.IFn; -import io.factorhouse.kpow.MetricsFilter; +import io.factorhouse.kpow.MetricFilter; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; @@ -75,7 +74,7 @@ public static Properties filterProperties(Properties props) { return nextProps; } - public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { + public StreamsRegistry(Properties props, MetricFilter metricsFilter) { IFn require = Clojure.var("clojure.core", "require"); require.invoke(Clojure.read("io.factorhouse.kpow.agent")); IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry"); @@ -88,8 +87,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) { agent = agentFn.invoke(producer, metricsFilter); } + public static MetricFilter defaultMetricFilter() { + return new MetricFilter() + .acceptNameStartsWith("foo") + .deny(); + } + public StreamsRegistry(Properties props) { - this(props, MetricsFilter.defaultMetricsFilter().build()); + this(props, StreamsRegistry.defaultMetricFilter()); } public StreamsAgent register(KafkaStreams streams, Topology topology) { diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 52e8b16..25b4fdc 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -2,7 +2,7 @@ (:require [clojure.core.protocols :as p] [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) - (:import (io.factorhouse.kpow MetricsFilter StreamsRegistry) + (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -80,7 +80,7 @@ (deftest agent-test (let [records (atom []) - metrics-filter (.build (MetricsFilter/emptyMetricsFilter)) + metrics-filter (-> (MetricFilter.) (.accept)) registry (agent/init-registry (mock-producer records) metrics-filter) agent (agent/register registry (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0)