Skip to content

Commit

Permalink
Complete javadoc coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Jan 8, 2025
1 parent 5943313 commit bee8a20
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ jobs:
- run: lein kaocha
- run: lein kondo
- run: lein fmt
- name: lint javadoc
- name: Lint javadoc
run: ./scripts/javadoc.sh
162 changes: 119 additions & 43 deletions src/java/io/factorhouse/kpow/MetricFilter.java
Original file line number Diff line number Diff line change
@@ -1,45 +1,94 @@
package io.factorhouse.kpow;

import org.apache.kafka.common.MetricName;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import org.apache.kafka.common.MetricName;

/**
*
* The MetricFilter class defines and manages a set of criteria for filtering metrics from a Kafka Streams application.
* Instances of this class can be used to either explicitly specify which metrics to accept or deny,
* based on various conditions such as the metric name, tags, or custom predicates.
* The class provides static methods to create common filters, such as accepting all metrics, denying all metrics, filtering
* by state store metrics, or applying default settings for a typical Kafka Streams application.
*
*/
public class MetricFilter {

private String filterId = null;

/**
* Returns a unique identifier used by Kpow's user interface to describe which MetricFilter has been configured.
*
* @return The identifier of the MetricFilter
*/
public String getFilterId() {
return filterId;
}

/**
* Enum representing the type of filter operation.
*/
public enum FilterType {
ACCEPT, DENY
/**
* Represents the acceptance of a metric.
*/
ACCEPT,

/**
* Represents the denial of a metric.
*/
DENY,
}

/**
* The FilterCriteria class encapsulates both the filtering type and the predicate used to define a MetricFilter.
*/
public static class FilterCriteria {

private final Predicate<MetricName> predicate;
private final FilterType filterType;

// Constructor to initialize both fields
private FilterCriteria(Predicate<MetricName> predicate, FilterType filterType) {
/**
* Constructs a new {@link FilterCriteria} object with the specified predicate and filter type.
*
* @param predicate The predicate used to define which metrics should be accepted or denied
* @param filterType The type of filter operation (ACCEPT or DENY)
*/
private FilterCriteria(
Predicate<MetricName> predicate,
FilterType filterType
) {
this.predicate = predicate;
this.filterType = filterType;
}

/**
* Returns the predicate used in this filter criteria.
*
* @return The predicate for filtering metric names.
*/
public Predicate<MetricName> getPredicate() {
return predicate;
}

/**
* Returns the type of operation (ACCEPT or DENY) associated with this filter criteria.
*
* @return The filter type (ACCEPT or DENY).
*/
public FilterType getFilterType() {
return filterType;
}
}

private final List<FilterCriteria> filters;

/**
* Creates a new MetricFilter instance for custom-defined filters.
*/
public MetricFilter() {
this.filters = new ArrayList<>();
this.filterId = "custom";
Expand Down Expand Up @@ -76,15 +125,16 @@ public static MetricFilter denyAllMetricFilter() {
*/
public static MetricFilter stateStoreMetricsOnlyFilter() {
Predicate<MetricName> stateStoreMetricsOnly = m ->
m.tags().containsKey("store") ||
m.tags().containsKey("in-memory-state-id") ||
m.tags().containsKey("in-memory-window-state-id") ||
m.tags().containsKey("in-memory-session-state-id") ||
m.tags().containsKey("rocksdb-session-state-id") ||
m.tags().containsKey("rocksdb-state-id") ||
m.tags().containsKey("rocksdb-window-state-id");
return new MetricFilter("stateStoreMetricsOnly")
.accept(stateStoreMetricsOnly);
m.tags().containsKey("store") ||
m.tags().containsKey("in-memory-state-id") ||
m.tags().containsKey("in-memory-window-state-id") ||
m.tags().containsKey("in-memory-session-state-id") ||
m.tags().containsKey("rocksdb-session-state-id") ||
m.tags().containsKey("rocksdb-state-id") ||
m.tags().containsKey("rocksdb-window-state-id");
return new MetricFilter("stateStoreMetricsOnly").accept(
stateStoreMetricsOnly
);
}

/**
Expand Down Expand Up @@ -113,26 +163,31 @@ public static MetricFilter stateStoreMetricsOnlyFilter() {
*/
public static MetricFilter defaultMetricFilter() {
return new MetricFilter("default")
// Latency
.acceptNameStartsWith("commit-latency-avg")
.acceptNameStartsWith("process-latency-avg")
.acceptNameStartsWith("poll-latency-avg")
// Throughput
.acceptNameStartsWith("process-rate")
.acceptNameStartsWith("records-processed-rate")
// Lag
.acceptNameStartsWith("commit-rate")
.acceptNameStartsWith("records-lag-max")
.acceptNameStartsWith("records-lag")
// Stability
.acceptNameStartsWith("failed-stream-threads")
.acceptNameStartsWith("rebalances")
// State store health
.acceptNameStartsWith("put-rate")
.acceptNameStartsWith("get-rate")
.acceptNameStartsWith("flush-rate");
// Latency
.acceptNameStartsWith("commit-latency-avg")
.acceptNameStartsWith("process-latency-avg")
.acceptNameStartsWith("poll-latency-avg")
// Throughput
.acceptNameStartsWith("process-rate")
.acceptNameStartsWith("records-processed-rate")
// Lag
.acceptNameStartsWith("commit-rate")
.acceptNameStartsWith("records-lag-max")
.acceptNameStartsWith("records-lag")
// Stability
.acceptNameStartsWith("failed-stream-threads")
.acceptNameStartsWith("rebalances")
// State store health
.acceptNameStartsWith("put-rate")
.acceptNameStartsWith("get-rate")
.acceptNameStartsWith("flush-rate");
}

/**
* Returns an unmodifiable list of {@link FilterCriteria} objects representing the current filter rules applied by this MetricFilter.
*
* @return An unmodifiable list of {@link FilterCriteria}
*/
public List<FilterCriteria> getFilters() {
return Collections.unmodifiableList(filters);
}
Expand All @@ -143,21 +198,28 @@ public List<FilterCriteria> getFilters() {
* @return an updated MetricFilter
*/
public MetricFilter accept() {
Predicate<MetricName> acceptPredicate = (_filter) -> {
Predicate<MetricName> acceptPredicate = _filter -> {
return true;
};
FilterCriteria criteria = new FilterCriteria(acceptPredicate, FilterType.ACCEPT);
FilterCriteria criteria = new FilterCriteria(
acceptPredicate,
FilterType.ACCEPT
);
this.filters.add(criteria);
return this;
}

/**
* Accepts a metric based on the specified Predicate.
*
* @param acceptFilter the predicate used to determine if a metric should be accepted
* @return an updated MetricFilter
*/
public MetricFilter accept(Predicate<MetricName> acceptFilter) {
FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT);
FilterCriteria criteria = new FilterCriteria(
acceptFilter,
FilterType.ACCEPT
);
this.filters.add(criteria);
return this;
}
Expand All @@ -168,51 +230,65 @@ public MetricFilter accept(Predicate<MetricName> acceptFilter) {
* @return an updated MetricFilter
*/
public MetricFilter deny() {
Predicate<MetricName> denyFilter = (_filter) -> {
Predicate<MetricName> denyFilter = _filter -> {
return true;
};
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
FilterCriteria criteria = new FilterCriteria(
denyFilter,
FilterType.DENY
);
this.filters.add(criteria);
return this;
}

/**
* Denies a metric based on the specified Predicate.
*
* @param denyFilter the predicate used to determine if a metric should be denied
* @return an updated MetricFilter
*/
public MetricFilter deny(Predicate<MetricName> denyFilter) {
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
FilterCriteria criteria = new FilterCriteria(
denyFilter,
FilterType.DENY
);
this.filters.add(criteria);
return this;
}

/**
* Accepts all metrics whose name start with the specified prefix.
*
* @param prefix the prefix of the metric names to accept
* @return an updated MetricFilter
*/
public MetricFilter acceptNameStartsWith(String prefix) {
Predicate<MetricName> acceptFilter = (metricName) -> {
Predicate<MetricName> acceptFilter = metricName -> {
return metricName.name().startsWith(prefix);
};
FilterCriteria criteria = new FilterCriteria(acceptFilter, FilterType.ACCEPT);
FilterCriteria criteria = new FilterCriteria(
acceptFilter,
FilterType.ACCEPT
);
this.filters.add(criteria);
return this;
}

/**
* Denies all metrics whose name start with the specified prefix.
*
* @param prefix the prefix of the metric names to deny
* @return an updated MetricFilter
*/
public MetricFilter denyNameStartsWith(String prefix) {
Predicate<MetricName> denyFilter = (metricName) -> {
Predicate<MetricName> denyFilter = metricName -> {
return metricName.name().startsWith(prefix);
};
FilterCriteria criteria = new FilterCriteria(denyFilter, FilterType.DENY);
FilterCriteria criteria = new FilterCriteria(
denyFilter,
FilterType.DENY
);
this.filters.add(criteria);
return this;
}
}

Loading

0 comments on commit bee8a20

Please sign in to comment.