diff --git a/test/io/factorhouse/agent_test.clj b/test/io/factorhouse/agent_test.clj index 44cb69e..72d4b72 100644 --- a/test/io/factorhouse/agent_test.clj +++ b/test/io/factorhouse/agent_test.clj @@ -3,7 +3,7 @@ [clojure.test :refer :all] [io.factorhouse.kpow.agent :as agent]) (:import (io.factorhouse.kpow MetricFilter StreamsRegistry) - (io.factorhouse.kpow.key ClientIdKeyStrategy) + (io.factorhouse.kpow.key ClientIdKeyStrategy ManualKeyStrategy) (java.util Properties) (org.apache.kafka.clients.producer Producer) (org.apache.kafka.common Metric MetricName) @@ -126,3 +126,109 @@ (is (agent/unregister registry agent)) (is (empty? (agent/close-registry registry))))) + +(deftest agent-test-metric-filters + (let [records (atom []) + metrics-filter (-> (MetricFilter.) + (.denyNameStartsWith "second") + (.acceptNameStartsWith "first") + (.acceptNameStartsWith "rocksdb") + (.deny)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ClientIdKeyStrategy.))] + + (is agent) + + (is (deref (:latch registry) 5000 false)) + + (is (= #{[[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id "abc123"}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :observation/plan, :snapshot/id {:domain :streams, :id "abc123"}, :data {:type :observe/streams-agent}}] + [[:streams "abc123" :kafka/streams-agent] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id "abc123"}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records))) + + (testing "consistent :captured value" + (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + + (is (agent/unregister registry agent)) + + (is (empty? (agent/close-registry registry))))) + +(deftest agent-test-manual-key-strategy + (let [records (atom []) + metrics-filter (-> (MetricFilter.) + (.denyNameStartsWith "second") + (.acceptNameStartsWith "first") + (.acceptNameStartsWith "rocksdb") + (.deny)) + registry (agent/init-registry (mock-producer records) metrics-filter) + agent (agent/register registry + (mock-streams [(mock-metric "first.metric" "first" "mock metric" {} 1.0) + (mock-metric "rocksdb.foo" "first" "mock metric" {"client-id" "abc123"} 3.0) + (mock-metric "application-id" "first" "mock metric" {"client-id" "abc123"} "xxx") + (mock-metric "second.metric" "first" "mock metric" {"client-id" "abc123"} 2.0)]) + (test-topology) + (ManualKeyStrategy. "Trade Book (Staging)"))] + + (is agent) + + (is (deref (:latch registry) 5000 false)) + + (is (= #{[[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :kafka/streams-agent, + :application-id "xxx", + :client-id "abc123", + :data {:topology {:sub-topologies #{{:id 0, + :nodes #{{:name "KSTREAM-SOURCE-0000000000", + :predecessors #{}, + :successors #{}, + :topic-set #{"__oprtr_snapshot_state"}, + :topic-pattern nil}}}}, + :global-stores #{}}, + :state "RUNNING"}, + :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}] + [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :observation/plan, :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}, :data {:type :observe/streams-agent}}] + [[:streams "Trade Book (Staging)" :kafka/streams-agent-m "abc123"] + {:type :kafka/streams-agent-metrics, + :application-id "xxx", + :client-id "abc123", + :data [{:name "first.metric", :tags {}, :value 1.0} + {:name "rocksdb.foo", :tags {"client-id" "abc123"}, :value 3.0}], + :snapshot/id {:domain :streams, :id "Trade Book (Staging)"}}]} + (into #{} (map (fn [record] + [(.key record) (dissoc (.value record) :job/id :captured)])) + @records))) + + (testing "consistent :captured value" + (is (= 1 (count (into #{} (map (fn [record] (-> record (.value) :captured))) @records))))) + + (is (agent/unregister registry agent)) + + (is (empty? (agent/close-registry registry)))))