Skip to content

Commit

Permalink
Add unit tests for key strat + metric filters
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Nov 13, 2024
1 parent 4a8708e commit df2ecb3
Showing 1 changed file with 107 additions and 1 deletion.
108 changes: 107 additions & 1 deletion test/io/factorhouse/agent_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[clojure.test :refer :all]
[io.factorhouse.kpow.agent :as agent])
(:import (io.factorhouse.kpow MetricFilter StreamsRegistry)
(io.factorhouse.kpow.key ClientIdKeyStrategy)
(io.factorhouse.kpow.key ClientIdKeyStrategy ManualKeyStrategy)
(java.util Properties)
(org.apache.kafka.clients.producer Producer)
(org.apache.kafka.common Metric MetricName)
Expand Down Expand Up @@ -126,3 +126,109 @@
(is (agent/unregister registry agent))

(is (empty? (agent/close-registry registry)))))

(deftest agent-test-metric-filters
(let [records (atom [])
metrics-filter (-> (MetricFilter.)
(.denyNameStartsWith "second")
(.acceptNameStartsWith "first")
(.acceptNameStartsWith "rocksdb")
(.deny))
registry (agent/init-registry (mock-producer records) metrics-filter)
agent (agent/register registry
(mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0)
(mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0)
(mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx")
(mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)])
(test-topology)
(ClientIdKeyStrategy.))]

(is agent)

(is (deref (:latch registry) 5000 false))

(is (= #{[[:streams "abc123" :kafka/streams-agent]
{:type :kafka/streams-agent,
:application-id "xxx",
:client-id "abc123",
:data {:topology {:sub-topologies #{{:id 0,
:nodes #{{:name "KSTREAM-SOURCE-0000000000",
:predecessors #{},
:successors #{},
:topic-set #{"__oprtr_snapshot_state"},
:topic-pattern nil}}}},
:global-stores #{}},
:state "RUNNING"},
:snapshot/id {:domain :streams, :id "abc123"}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}]
[[:streams "abc123" :kafka/streams-agent]
{:type :kafka/streams-agent-metrics,
:application-id "xxx",
:client-id "abc123",
:data [{:name "first.metric", :tags {}, :value 1.0}
{:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}],
:snapshot/id {:domain :streams, :id "abc123"}}]}
(into #{} (map (fn [record]
[(.key record) (dissoc (.value record) :job/id :captured)]))
@records)))

(testing "consistent :captured value"
(is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records)))))

(is (agent/unregister registry agent))

(is (empty? (agent/close-registry registry)))))

(deftest agent-test-manual-key-strategy
(let [records (atom [])
metrics-filter (-> (MetricFilter.)
(.denyNameStartsWith "second")
(.acceptNameStartsWith "first")
(.acceptNameStartsWith "rocksdb")
(.deny))
registry (agent/init-registry (mock-producer records) metrics-filter)
agent (agent/register registry
(mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0)
(mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0)
(mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx")
(mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)])
(test-topology)
(ManualKeyStrategy. "Trade Book (Staging)"))]

(is agent)

(is (deref (:latch registry) 5000 false))

(is (= #{[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :kafka/streams-agent,
:application-id "xxx",
:client-id "abc123",
:data {:topology {:sub-topologies #{{:id 0,
:nodes #{{:name "KSTREAM-SOURCE-0000000000",
:predecessors #{},
:successors #{},
:topic-set #{"__oprtr_snapshot_state"},
:topic-pattern nil}}}},
:global-stores #{}},
:state "RUNNING"},
:snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]
[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}]
[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"]
{:type :kafka/streams-agent-metrics,
:application-id "xxx",
:client-id "abc123",
:data [{:name "first.metric", :tags {}, :value 1.0}
{:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}],
:snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]}
(into #{} (map (fn [record]
[(.key record) (dissoc (.value record) :job/id :captured)]))
@records)))

(testing "consistent :captured value"
(is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records)))))

(is (agent/unregister registry agent))

(is (empty? (agent/close-registry registry)))))

0 comments on commit df2ecb3

Please sign in to comment.