Skip to content

Commit

Permalink
Flesh out agent impl
Browse files Browse the repository at this point in the history
  • Loading branch information
wavejumper committed Mar 2, 2021
1 parent 1adaca7 commit 17d2336
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 8 deletions.
1 change: 1 addition & 0 deletions dev-resources/config.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
13 changes: 11 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
(defproject streams-agent "0.1.0"
:description "kPow streams agent"
:url "https://github.com/operatr-io/streams-agent"
:dependencies [[org.clojure/clojure "1.10.2"]]
:repl-options {:init-ns streams-agent.core})
:dependencies [[org.clojure/clojure "1.10.2"]
[org.clojure/core.async "1.3.610"]
[org.clojure/tools.logging "1.1.0"]
[com.cognitect/transit-clj "1.0.324"]
[org.apache.kafka/kafka-streams "2.6.0" :scope "provided"]]
:uberjar {:prep-tasks ["clean" "javac" "compile"]
:aot :all
:omit-source true}
:profiles {:dev {:resource-paths ["dev-resources"]}}
:java-source-paths ["src/java"]
:source-paths ["src/clojure"])
202 changes: 202 additions & 0 deletions src/clojure/com/operatr/kpow/agent.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
(ns com.operatr.kpow.agent
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.core.async :as async]
[clojure.core.protocols :as p])
(:import (org.apache.kafka.clients.producer Producer ProducerRecord)
(org.apache.kafka.streams KafkaStreams Topology KeyValue TopologyDescription TopologyDescription$Subtopology
TopologyDescription$GlobalStore TopologyDescription$Node TopologyDescription$Source
TopologyDescription$Processor TopologyDescription$Sink)
(java.util UUID)))

(extend-protocol p/Datafiable
KeyValue
(datafy [kv]
{:key (.key kv)
:value (.value kv)})

TopologyDescription
(datafy [td]
{:sub-topologies (set (map p/datafy (.subtopologies td)))
:global-stores (set (map p/datafy (.globalStores td)))})

TopologyDescription$Subtopology
(datafy [st]
{:id (.id st)
:nodes (set (map p/datafy (.nodes st)))})

TopologyDescription$GlobalStore
(datafy [gs]
{:id (.id gs)
:source (p/datafy (.source gs))
:processor (p/datafy (.processor gs))})

TopologyDescription$Node
(datafy [node]
(cond-> {:name (.name node)
:predecessors (set (map #(identity {:name (.name %)}) (.predecessors node)))
:successors (set (map #(identity {:name (.name %)}) (.successors node)))}
(instance? TopologyDescription$Source node)
(merge {:topic-set (.topicSet ^TopologyDescription$Source node)
:topic-pattern (some-> (.topicPattern ^TopologyDescription$Source node) str)})
(instance? TopologyDescription$Processor node)
(assoc :stores (.stores ^TopologyDescription$Processor node))
(instance? TopologyDescription$Sink node)
(merge {:topic (.topic ^TopologyDescription$Sink node)
:topic-extraction? (not (nil? (.topicNameExtractor ^TopologyDescription$Sink node)))}))))

(def kpow-snapshot-topic
"__oprtr_snapshot_state")

(defn metrics
[^KafkaStreams streams]
(into [] (map (fn [[_ metric]]
(let [metric-name (.metricName metric)]
{:value (.metricValue metric)
:description (.description metric-name)
:group (.group metric-name)
:name (.name metric-name)
:tags (into {} (.tags metric-name))})))
(.metrics streams)))

(defn application-id
[metrics]
(some #(when (= "application-id" (:name %)) (:value %)) metrics))

(defn client-id
[metrics]
(let [app-id (application-id metrics)
client-id (some #(get-in % [:tags "client-id"]) metrics)]
(-> (str/replace client-id (str app-id "-") "")
(str/split #"-StreamThread-")
(first))))

(defn numeric-metrics
[metrics]
(->> metrics
(filter (comp number? :value))
(remove (fn [{:keys [value]}]
(if (double? value)
(Double/isNaN value)
false)))
(map #(select-keys % [:name :tags :value]))))

(defn snapshot-send
[{:keys [snapshot-topic ^Producer producer snapshot-id application-id job-id client-id]} data]
(let [captured (System/currentTimeMillis)
snapshot {:type :kafka/streams-agent
:application-id application-id
:client-id client-id
:captured captured
:data data
:job/id job-id
:snapshot/id snapshot-id}
taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]
record (ProducerRecord. (:topic snapshot-topic) taxon snapshot)]
(.get (.send producer record))))

(defn metrics-send
[{:keys [snapshot-topic producer snapshot-id application-id job-id client-id]} metrics]
(let [taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]]
(doseq [data (partition-all 250 metrics)]
(let [captured (System/currentTimeMillis)
value {:type :kafka/streams-agent-metrics
:application-id application-id
:client-id client-id
:captured captured
:data (vec data)
:job/id job-id
:snapshot/id snapshot-id}
record (ProducerRecord. (:topic snapshot-topic) taxon value)]
(.get (.send producer record))))))

(defn plan-send
[{:keys [snapshot-topic producer snapshot-id job-id]}]
(let [captured (System/currentTimeMillis)
taxon [(:domain snapshot-id) (:id snapshot-id) :kafka/streams-agent]
plan {:type :observation/plan
:captured captured
:snapshot/id snapshot-id
:job/id job-id
:data {:type :observe/streams-agent}}
record (ProducerRecord. (:topic snapshot-topic) taxon plan)]
(.get (.send producer record))))

(defn snapshot-telemetry
[{:keys [^KafkaStreams streams ^Topology topology] :as ctx}]
(let [topology (p/datafy (.describe topology))
state (str (.state streams))
;; TODO: do metrics go directly to metrics topic, or as discrete snapshot events?
metrics (metrics streams)
snapshot {:topology topology :state state}
client-id (client-id metrics)
application-id (application-id metrics)
ctx (assoc ctx
:client-id client-id
:application-id application-id
:snapshot-id {:domain :streams :id client-id})]
(snapshot-send ctx snapshot)
(metrics-send ctx (numeric-metrics metrics))
ctx))

(defn snapshot-loop
[registered-topologies _close-ch snapshot-topic producer]
(async/go-loop []
(let [job-id (str (UUID/randomUUID))
ctx {:job-id job-id
:snapshot-topic snapshot-topic
:producer producer}]

(doseq [[_ [streams topology]] @registered-topologies]
(try (let [next-ctx (snapshot-telemetry (assoc ctx :streams streams :topology topology))]
(async/<! (async/timeout 2000))
(plan-send next-ctx))
(catch Throwable e
(log/error e "kPow: error sending streams snapshot")))))

(async/<! (async/timeout 60000))
(recur)))

(defn start-agent
[{:keys [snapshot-topic producer]}]
(log/info "kPow: starting agent")
(let [registered-topologies (atom {})
close-ch (async/chan)
register-fn (fn [streams topology]
(let [id (str (UUID/randomUUID))]
(swap! registered-topologies assoc id [streams topology])
id))]
{:register register-fn
:close-ch close-ch
:topologies registered-topologies
:snapshot-loop (snapshot-loop registered-topologies close-ch snapshot-topic producer)}))

(defn close-agent
[agent]
(log/info "kPow: closing agent")
(when-let [close-ch (:close-ch agent)]
(async/close! close-ch))
(when-let [snapshot-loop (:snapshot-loop agent)]
(async/close! snapshot-loop))
(when-let [registered-topologies (:topologies agent)]
(reset! registered-topologies {}))
{})

(defn init-agent
[producer]
(start-agent {:snapshot-topic kpow-snapshot-topic :producer producer}))

(defn register
[agent streams topology]
(when-let [register-fn (:register agent)]
(let [id (register-fn streams topology)]
(log/infof "kPow: registring new KafkaStreams instance %s" id)
id)))

(defn unregister
[agent ^String id]
(when-let [registered-topologies (:topologies agent)]
(swap! registered-topologies dissoc id)
(log/infof "kPow: unregistered KafkaStreams instance %s" id)
true))

106 changes: 106 additions & 0 deletions src/clojure/com/operatr/kpow/serdes.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
(ns com.operatr.kpow.serdes
(:require [cognitect.transit :as transit])
(:import (java.io ByteArrayOutputStream ByteArrayInputStream)
(org.apache.kafka.streams.kstream Windowed Window)
(org.apache.kafka.common.serialization Serde Deserializer Serializer)))

(def windowed-writer
(transit/write-handler
(constantly "wd")
(fn [^Windowed v]
(let [k (.key v)
window (.window v)
start-ms (.start window)
end-ms (.end window)]
[start-ms end-ms k]))
(fn [v]
(let [k (.key v)
window (.window v)
start-ms (.start window)
end-ms (.end window)]
(pr-str [start-ms end-ms k])))))

(defn window [start-ms end-ms]
(proxy [Window] [start-ms end-ms]))

(defn windowed-reader
[[start-ms end-ms k]]
(Windowed. k (window start-ms end-ms)))

(def write-opts
{:handlers {Windowed windowed-writer}})

(def read-opts
{:handlers {"wd" (transit/read-handler windowed-reader)}})

(defn transit-serialize
[format data]
(when data
(let [stream (ByteArrayOutputStream.)]
(transit/write (transit/writer stream format write-opts) data)
(.toByteArray stream))))

(defn transit-deserialize
[format bytes]
(when bytes
(transit/read (transit/reader (ByteArrayInputStream. bytes) format read-opts))))

(deftype TransitJsonSerializer []
Serializer
(configure [_ _ _])
(serialize [_ _ data] (transit-serialize :json data))
(close [_]))

(deftype TransitJsonDeserializer []
Deserializer
(configure [_ _ _])
(deserialize [_ _ bytes] (transit-deserialize :json bytes))
(close [_]))

(deftype TransitJsonSerde []
Serde
(configure [_ _ _])
(close [_])
(serializer [_] (TransitJsonSerializer.))
(deserializer [_] (TransitJsonDeserializer.)))

(deftype TransitJsonVerboseSerializer []
Serializer
(configure [_ _ _])
(serialize [_ _ data] (transit-serialize :json-verbose data))
(close [_]))

(deftype TransitJsonVerboseDeserializer []
Deserializer
(configure [_ _ _])
(deserialize [_ _ bytes] (transit-deserialize :json-verbose bytes))
(close [_]))

(deftype TransitJsonVerboseSerde []
Serde
(configure [_ _ _])
(close [_])
(serializer [_] (TransitJsonSerializer.))
(deserializer [_] (TransitJsonDeserializer.)))

(deftype TransitMsgpackSerializer []
Serializer
(configure [_ _ _])
(serialize [_ _ data] (transit-serialize :msgpack data))
(close [_]))

(deftype TransitMsgpackDeserializer []
Deserializer
(configure [_ _ _])
(deserialize [_ _ bytes] (transit-deserialize :msgpack bytes))
(close [_]))

(deftype TransitMsgpackSerde []
Serde
(configure [_ _ _])
(close [_])
(serializer [_] (TransitMsgpackSerializer.))
(deserializer [_] (TransitMsgpackDeserializer.)))

(defn transit-json-serializer []
(TransitJsonSerializer.))
13 changes: 13 additions & 0 deletions src/java/com/operatr/kpow/StreamsAgent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.operatr.kpow;

public class StreamsAgent {
private final String _id;

StreamsAgent(String id) {
_id = id;
}

public String getId() {
return _id;
}
}
Loading

0 comments on commit 17d2336

Please sign in to comment.