Skip to content

Commit

Permalink
Add agent metadata to observation snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Nov 18, 2024
1 parent d3af2cc commit b822adc
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 62 deletions.
73 changes: 39 additions & 34 deletions src/clojure/io/factorhouse/kpow/agent.clj
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@
(defn apply-metric-filters
[^MetricName metric-name filters]
(reduce
(fn [acc ^MetricFilter$FilterCriteria filter-criteria]
(let [metric-filter-type (.getFilterType filter-criteria)
predicate (.getPredicate filter-criteria)]
(if (.test predicate metric-name)
(reduced
(case (.name metric-filter-type)
"ACCEPT" true
"DENY" false))
acc)))
nil
filters))
(fn [acc ^MetricFilter$FilterCriteria filter-criteria]
(let [metric-filter-type (.getFilterType filter-criteria)
predicate (.getPredicate filter-criteria)]
(if (.test predicate metric-name)
(reduced
(case (.name metric-filter-type)
"ACCEPT" true
"DENY" false))
acc)))
nil
filters))

(defn numeric-metrics
[metrics ^MetricFilter metrics-filter]
Expand All @@ -124,7 +124,7 @@
:captured captured
:data data
:job/id job-id
:snapshot/id {:domain (first taxon) :id (second taxon)}}
:snapshot/id {:domain :streams :id taxon}}
record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)]
(.get (.send producer record))))

Expand All @@ -138,48 +138,53 @@
:captured captured
:data (vec data)
:job/id job-id
:snapshot/id {:domain (first taxon) :id (second taxon)}}
:snapshot/id {:domain :streams :id taxon}}
record (ProducerRecord. (:topic snapshot-topic) taxon value)]
(.get (.send producer record))))
(log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))

(defn plan-send
[{:keys [snapshot-topic producer job-id captured taxon]}]
[{:keys [snapshot-topic producer job-id captured taxon metrics-summary]}]
(let [taxon (p/datafy taxon)
plan {:type :observation/plan
:captured captured
:snapshot/id {:domain (first taxon) :id (second taxon)}
:snapshot/id {:domain :streams :id taxon}
:job/id job-id
:data {:type :observe/streams-agent}}
:data {:type :observe/streams-agent
:agent {:metrics-summary metrics-summary
:version "1.0.0"}}}
record (ProducerRecord. (:topic snapshot-topic) taxon plan)]
(.get (.send producer record))))

(defn snapshot-telemetry
[{:keys [streams ^Topology topology metrics-filter ^KeyStrategy key-strategy] :as ctx}]
[{:keys [streams ^Topology topology ^MetricFilter metrics-filter ^KeyStrategy key-strategy] :as ctx}]
(let [metrics (metrics streams)]
(if (empty? metrics)
(log/warn "KafkStreams .metrics() method returned an empty collection, no telemetry was sent. Has something mutated the global metrics registry?")
(let [topology (p/datafy (.describe topology))
state (str (.state streams))
snapshot {:topology topology :state state}
client-id (client-id metrics)
application-id (application-id metrics)
taxon (.getTaxon key-strategy client-id application-id)
ctx (assoc ctx
:captured (System/currentTimeMillis)
:client-id client-id
:application-id application-id
:taxon taxon)]
(let [topology (p/datafy (.describe topology))
state (str (.state streams))
snapshot {:topology topology :state state}
client-id (client-id metrics)
application-id (application-id metrics)
taxon (.getTaxon key-strategy client-id application-id)
ctx (assoc ctx
:captured (System/currentTimeMillis)
:client-id client-id
:application-id application-id
:taxon taxon)
filtered-metrics (numeric-metrics metrics metrics-filter)]
(when (nil? application-id)
(throw (Exception. "Cannot infer application id from metrics returned from KafkaStreams instance. Expected metric \"application-id\" in the metrics registry.")))
(when (nil? client-id)
(throw (Exception.
(format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s"
(client-id-tag metrics)
application-id))))
(format "Cannot infer client id from metrics returned from KafkaStreams instance. Got: client-id %s and application-id %s"
(client-id-tag metrics)
application-id))))
(snapshot-send ctx snapshot)
(metrics-send ctx (numeric-metrics metrics metrics-filter))
ctx))))
(metrics-send ctx filtered-metrics)
(assoc ctx :metrics-summary {:total (count metrics)
:sent (count filtered-metrics)
:id (some-> metrics-filter .getFilterId)})))))

(defn snapshot-task
^Runnable [snapshot-topic producer registered-topologies metrics-filter latch]
Expand All @@ -194,7 +199,7 @@
(Thread/sleep 2000)
(plan-send next-ctx))
(catch Throwable e
(log/errorf e "Kpow: error sending streams snapshot for agent %s" id))))
(log/warnf e "Kpow: error sending streams snapshot for agent %s" id))))

(deliver latch true))))

Expand Down
21 changes: 20 additions & 1 deletion src/java/io/factorhouse/kpow/MetricFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import java.util.function.Predicate;

public class MetricFilter {

private String filterId = null;

public String getFilterId() {
return filterId;
}

public enum FilterType {
ACCEPT, DENY
}
Expand Down Expand Up @@ -35,6 +42,18 @@ public FilterType getFilterType() {

public MetricFilter() {
this.filters = new ArrayList<>();
this.filterId = "custom";
}

private MetricFilter(String id) {
this.filters = new ArrayList<>();
this.filterId = id;
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter("default")
.acceptNameStartsWith("streams.state")
.deny();
}

public List<FilterCriteria> getFilters() {
Expand Down Expand Up @@ -80,4 +99,4 @@ public MetricFilter denyNameStartsWith(String prefix) {
this.filters.add(criteria);
return this;
}
}
}
8 changes: 1 addition & 7 deletions src/java/io/factorhouse/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) {
}

public StreamsRegistry(Properties props) {
this(props, StreamsRegistry.defaultMetricFilter());
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter()
.acceptNameStartsWith("foo")
.deny();
this(props, MetricFilter.defaultMetricFilter());
}

public StreamsAgent register(KafkaStreams streams, Topology topology, KeyStrategy keyStrategy) {
Expand Down
2 changes: 1 addition & 1 deletion src/java/io/factorhouse/kpow/key/ClusterIdKeyStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public ClusterIdKeyStrategy(Properties props) throws InterruptedException, Execu

@Override
public Taxon getTaxon(String clientId, String applicationId) {
return new Taxon("streams", clusterId, "streams-agent-cid", clientId);
return new Taxon("cluster", clusterId, "streams-agent-cid", clientId);
}
}
2 changes: 1 addition & 1 deletion src/java/io/factorhouse/kpow/key/ManualKeyStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public ManualKeyStrategy(String envName) {

@Override
public Taxon getTaxon(String clientId, String applicationId) {
return new Taxon("streams", envName, "streams-agent-m", clientId);
return new Taxon("env", envName, "streams-agent-m", clientId);
}
}
10 changes: 2 additions & 8 deletions src/java/io/operatr/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,8 @@ public StreamsRegistry(Properties props, MetricFilter metricsFilter) {
agent = agentFn.invoke(producer, metricsFilter);
}

public static MetricFilter defaultMetricFilter() {
return new MetricFilter()
.acceptNameStartsWith("foo")
.deny();
}

public StreamsRegistry(Properties props) {
this(props, StreamsRegistry.defaultMetricFilter());
this(props, MetricFilter.defaultMetricFilter());
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
Expand Down Expand Up @@ -125,4 +119,4 @@ public void close() {
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
}
38 changes: 28 additions & 10 deletions test/io/factorhouse/agent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,22 @@
:topic-pattern nil}}}},
:global-stores #{}},
:state "RUNNING"},
:snapshot/id {:domain :streams, :id "abc123"}}]
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}]
{:type :observation/plan
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}
:data {:type :observe/streams-agent
:agent {:metrics-summary {:id "custom"
:sent 2
:total 3}
:version "1.0.0"}}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :kafka/streams-agent-metrics,
:application-id "xxx",
:client-id "abc123",
:data [{:name "first.metric", :tags {}, :value 1.0}
{:name "second.metric", :tags {"client-id" "abc123"}, :value 2.0}],
:snapshot/id {:domain :streams, :id "abc123"}}]}
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]}
(into #{} (map (fn [record]
[(.key record) (dissoc (.value record) :job/id :captured)]))
@records)))
Expand Down Expand Up @@ -159,16 +165,22 @@
:topic-pattern nil}}}},
:global-stores #{}},
:state "RUNNING"},
:snapshot/id {:domain :streams, :id "abc123"}}]
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}]
{:type :observation/plan
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}
:data {:type :observe/streams-agent
:agent {:metrics-summary {:id "custom"
:sent 2
:total 4}
:version "1.0.0"}}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :kafka/streams-agent-metrics,
:application-id "xxx",
:client-id "abc123",
:data [{:name "first.metric", :tags {}, :value 1.0}
{:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}],
:snapshot/id {:domain :streams, :id "abc123"}}]}
:snapshot/id {:domain :streams, :id [:streams "abc123" :kafka/streams-agent]}}]}
(into #{} (map (fn [record]
[(.key record) (dissoc (.value record) :job/id :captured)]))
@records)))
Expand Down Expand Up @@ -196,7 +208,7 @@

(is (deref (:latch registry) 5000 false))

(is (= #{[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
(is (= #{[[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :kafka/streams-agent,
:application-id "xxx",
:client-id "abc123",
Expand All @@ -208,9 +220,15 @@
:topic-pattern nil}}}},
:global-stores #{}},
:state "RUNNING"},
:snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]
[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}]}
:snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]}}]
[[:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :observation/plan
:snapshot/id {:domain :streams, :id [:env "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]}
:data {:type :observe/streams-agent
:agent {:metrics-summary {:id "custom"
:sent 0
:total 4}
:version "1.0.0"}}}]}
(into #{} (map (fn [record]
[(.key record) (dissoc (.value record) :job/id :captured)]))
@records)))
Expand Down

0 comments on commit b822adc

Please sign in to comment.