Skip to content

Commit

Permalink
Add allow/deny predicates to MetricFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Nov 12, 2024
1 parent 169d88f commit 15b57e1
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 61 deletions.
33 changes: 21 additions & 12 deletions src/clojure/io/factorhouse/kpow/agent.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
[clojure.core.protocols :as p])
(:import (java.util UUID)
(java.util.concurrent Executors TimeUnit ThreadFactory)
(java.util.function Predicate)
(org.apache.kafka.clients.producer Producer ProducerRecord)
(org.apache.kafka.common MetricName)
(org.apache.kafka.streams Topology KeyValue TopologyDescription TopologyDescription$Subtopology
TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source
TopologyDescription$Processor TopologyDescription$Sink)
(io.factorhouse.kpow MetricsFilter)
(io.factorhouse.kpow MetricFilter MetricFilter$FilterCriteria)
(java.util.concurrent.atomic AtomicInteger)))

(def kpow-snapshot-topic
Expand Down Expand Up @@ -81,22 +80,32 @@
(str/split #"-StreamThread-")
(first))))

(defn apply-metric-filters
[^MetricName metric-name filters]
(reduce
(fn [acc ^MetricFilter$FilterCriteria filter-criteria]
(let [metric-filter-type (.getFilterType filter-criteria)
predicate (.getPredicate filter-criteria)]
(if (.test predicate metric-name)
(reduced
(case (.name metric-filter-type)
"ACCEPT" true
"DENY" false))
acc)))
nil
filters))

(defn numeric-metrics
[metrics ^MetricsFilter metrics-filter]
(let [filters (.getFilters metrics-filter)
filter-fn (if (seq filters)
(fn [{:keys [metric-name]}]
(some (fn [^Predicate predicate]
(.test predicate ^MetricName metric-name))
filters))
(constantly identity))]
[metrics ^MetricFilter metrics-filter]
(let [filters (.getFilters metrics-filter)]
(into [] (comp (filter (comp number? :value))
(remove (fn [{:keys [value]}]
(if (double? value)
(Double/isNaN value)
false)))
(map #(select-keys % [:name :tags :value]))
(filter filter-fn))
(filter (fn [{:keys [metric-name]}]
(apply-metric-filters metric-name filters)))
(map #(select-keys % [:name :tags :value])))
metrics)))

(defn snapshot-send
Expand Down
83 changes: 83 additions & 0 deletions src/java/io/factorhouse/kpow/MetricFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.factorhouse.kpow;

import org.apache.kafka.common.MetricName;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

public class MetricFilter {
public enum FilterType {
ACCEPT, DENY
}

public static class FilterCriteria {
private final Predicate<MetricName> predicate;
private final FilterType filterType;

// Constructor to initialize both fields
private FilterCriteria(Predicate<MetricName> predicate, FilterType filterType) {
this.predicate = predicate;
this.filterType = filterType;
}

public Predicate<MetricName> getPredicate() {
return predicate;
}

public FilterType getFilterType() {
return filterType;
}
}

private final List<FilterCriteria> filters;

public MetricFilter() {
this.filters = new ArrayList<>();
}

public List<FilterCriteria> getFilters() {
return Collections.unmodifiableList(filters);
}

public MetricFilter accept() {
Predicate<MetricName> acceptPredicate = (_filter) -> { return true; };
FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT);
this.filters.add(criteria);
return this;
}

public MetricFilter accept(Predicate<MetricName> acceptFilter) {
FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT);
this.filters.add(criteria);
return this;
}

public MetricFilter deny() {
Predicate<MetricName> denyFilter = (_filter) -> { return true; };
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
this.filters.add(criteria);
return this;
}

public MetricFilter deny(Predicate<MetricName> denyFilter) {
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
this.filters.add(criteria);
return this;
}

public MetricFilter acceptNameStartsWith(String prefix) {
Predicate<MetricName> acceptFilter = (metricName) -> { return metricName.name().startsWith(prefix); };
FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT);
this.filters.add(criteria);
return this;
}

public MetricFilter denyNameStartsWith(String prefix) {
Predicate<MetricName> denyFilter = (metricName) -> { return metricName.name().startsWith(prefix); };
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
this.filters.add(criteria);
return this;
}
}
40 changes: 0 additions & 40 deletions src/java/io/factorhouse/kpow/MetricsFilter.java

This file was deleted.

11 changes: 8 additions & 3 deletions src/java/io/factorhouse/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import clojure.java.api.Clojure;
import clojure.lang.IFn;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
Expand Down Expand Up @@ -74,7 +73,7 @@ public static Properties filterProperties(Properties props) {
return nextProps;
}

public StreamsRegistry(Properties props, MetricsFilter metricsFilter) {
public StreamsRegistry(Properties props, MetricFilter metricsFilter) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
Expand All @@ -87,8 +86,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) {
agent = agentFn.invoke(producer, metricsFilter);
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter()
.acceptNameStartsWith("foo")
.deny();
}

public StreamsRegistry(Properties props) {
this(props, MetricsFilter.defaultMetricsFilter().build());
this(props, StreamsRegistry.defaultMetricFilter());
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
Expand Down
13 changes: 9 additions & 4 deletions src/java/io/operatr/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import clojure.java.api.Clojure;
import clojure.lang.IFn;
import io.factorhouse.kpow.MetricsFilter;
import io.factorhouse.kpow.MetricFilter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
Expand Down Expand Up @@ -75,7 +74,7 @@ public static Properties filterProperties(Properties props) {
return nextProps;
}

public StreamsRegistry(Properties props, MetricsFilter metricsFilter) {
public StreamsRegistry(Properties props, MetricFilter metricsFilter) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
Expand All @@ -88,8 +87,14 @@ public StreamsRegistry(Properties props, MetricsFilter metricsFilter) {
agent = agentFn.invoke(producer, metricsFilter);
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter()
.acceptNameStartsWith("foo")
.deny();
}

public StreamsRegistry(Properties props) {
this(props, MetricsFilter.defaultMetricsFilter().build());
this(props, StreamsRegistry.defaultMetricFilter());
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
Expand Down
4 changes: 2 additions & 2 deletions test/io/factorhouse/agent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:require [clojure.core.protocols :as p]
[clojure.test :refer :all]
[io.factorhouse.kpow.agent :as agent])
(:import (io.factorhouse.kpow MetricsFilter StreamsRegistry)
(:import (io.factorhouse.kpow MetricFilter StreamsRegistry)
(java.util Properties)
(org.apache.kafka.clients.producer Producer)
(org.apache.kafka.common Metric MetricName)
Expand Down Expand Up @@ -80,7 +80,7 @@

(deftest agent-test
(let [records (atom [])
metrics-filter (.build (MetricsFilter/emptyMetricsFilter))
metrics-filter (-> (MetricFilter.) (.accept))
registry (agent/init-registry (mock-producer records) metrics-filter)
agent (agent/register registry
(mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0)
Expand Down

0 comments on commit 15b57e1

Please sign in to comment.