Skip to content

Commit

Permalink
Merge pull request #6 from factorhouse/update-to-factorhouse
Browse files Browse the repository at this point in the history
update-to-factorhouse
  • Loading branch information
d-t-w authored May 8, 2024
2 parents 8cefd7c + 3e080a8 commit aa9a791
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 58 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Change Log
All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## [1.0.0] - 2024-05-07
### Changed
- Move to `io.factorhouse` domain from `io.operatr`
- Keep old `io.operatr.kpow.StreamsRegistry` entry point for backwards compatibility
- Bump Kafka Streams to 3.6.1 (same as Kpow)
- Default producer `enable.idempotence` to false (avoid ACL issues with Kafka 3.2.0+)
- Bump other dependencies to latest
- Update readme and images

## [0.2.12] - 2024-04-10
### Changed
- Bump dependencies, fix CVE-2024-22871
Expand Down Expand Up @@ -55,7 +64,6 @@ All notable changes to this project will be documented in this file. This change
### Fixed
- Fixed formatting of logging statement


## [0.2.0] - 2021-23-06
### Added
- `io.operatr.kpow.StreamsRegistry` (initial release)
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ This repository contains the Kpow Streams Agent.
Use this agent to integrate your Kafka Streams applications with Kpow and unlock the following features:

* See summaries of Kafka Streams activity for your Kafka cluster(s).
* Monitor Kafka Streams metrics (e.g Streams State, Stream-Thread, State Store, RocksDB, etc).
* Visualise Kafka Streams topologies in the Kpow Workflows UI.
* Monitor Kafka Streams metrics (e.g Stream-Thread, State Store, RocksDB, etc).
* Aggregate and Expose Kafka Streams metrics via Kpow [Prometheus Endpoints](https://docs.kpow.io/features/prometheus) (for alerting, etc).
* (Soon) View Kpow Insights of your Kafka Streams applications (outlier metrics, etc).
* Aggregate and Expose Kafka Streams metrics via Kpow [Prometheus Endpoints](https://docs.factorhouse.io/kpow-ee/features/prometheus/) (for alerting, etc).

See the [Kpow Kafka Streams Feature Guide](https://docs.kpow.io/features/kafka-streams) for full documentation.
See the [Kpow Kafka Streams Feature Guide](https://docs.factorhouse.io/kpow-ee/features/kafka-streams/) for full documentation.

See the [Kpow Kafka Streams Spring Word Count Example](https://github.com/factorhouse/kpow-streams-spring-cloud-example) for an integration of Spring, Kafka, and Kpow.

---

![streams-ui](docs/streams.png)
![streams-ui](docs/kpow-streams.png)

---

![topology-ui](docs/topologies.png)
![topology-ui](docs/kpow-streams-topology.png)

# Prerequisites

The Kpow Streams Agent requires a running instance of Kpow.

Evaluate Kpow with the [Kpow Local](https://github.com/factorhouse/kpow-local) repository or see our [Quick Start](https://docs.kpow.io/installation/quick-start) guide.
Evaluate Kpow with the [Kpow Local](https://github.com/factorhouse/kpow-local) repository or see our [Quick Start](https://docs.factorhouse.io/kpow-ee/installation/quick-start/) guide.

# Installation

Expand All @@ -49,11 +48,11 @@ Include the agent as a dependency in your Kafka Streams application.

In your application, just before you start your KafkaStreams instance:

* Create a new [io.operatr.kpow.StreamsRegistry](https://github.com/factorhouse/kpow-streams-agent/blob/main/src/java/io/operatr/kpow/StreamsRegistry.java) instance.
* Create a new [io.factorhouse.kpow.StreamsRegistry](https://github.com/factorhouse/kpow-streams-agent/blob/main/src/java/io/factorhouse/kpow/StreamsRegistry.java) instance.
* Register your KafkaStreams and Topology instances with the StreamsRegistry.

```java
import io.operatr.kpow.StreamsRegistry;
import io.factorhouse.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topology = createMyTopology();
Expand Down Expand Up @@ -162,7 +161,7 @@ StreamsRegistry registry = new StreamsRegistry(primaryProps);
...
```

See the [Kpow Multi-Cluster Feature Guide](https://docs.kpow.io/config/multi-cluster) for more information.
See the [Kpow Multi-Cluster Feature Guide](https://docs.factorhouse.io/kpow-ee/config/multi-cluster/) for more information.

### Multi-Cluster Kpow Feedback Requested

Expand Down Expand Up @@ -195,9 +194,10 @@ You can verify `StreamsRegistry` is sending telemetry to your Kafka Cluster by u

* Select topic `__oprtr_snapshot_state`
* Choose `Transit / JSON` as the key deserializer
* Enter the following kJQ filter for the key: `.[0] == :streams`
* Choose `Last 15 minutes` as the window
* Enter the following kJQ filter: `.key[0] == :streams`

![Data Inspect](docs/data-inspect.png)
![Data Inspect](docs/kpow-data-inspect.png)

# Get Help

Expand Down
Binary file removed docs/data-inspect.png
Binary file not shown.
Binary file added docs/kpow-data-inspect.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/kpow-streams-topology.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/kpow-streams.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/screenshot.png
Binary file not shown.
Binary file removed docs/streams-ui.png
Binary file not shown.
Binary file removed docs/streams.png
Binary file not shown.
Binary file removed docs/topologies.png
Binary file not shown.
Binary file removed docs/topology-ui.png
Binary file not shown.
26 changes: 13 additions & 13 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
(defproject io.operatr/kpow-streams-agent "0.2.12"
:description "kPow's Kafka Streams monitoring agent"
:url "https://github.com/operatr-io/kpow-streams-agent"
(defproject io.factorhouse/kpow-streams-agent "0.2.13"
:description "Kpow's Kafka Streams monitoring agent"
:url "https://github.com/factorhouse/kpow-streams-agent"
:license {:name "Apache-2.0 License"
:url "https://www.apache.org/licenses/LICENSE-2.0"
:distribution :repo
:comments "same as Kafka"}
:scm {:name "git" :url "https://github.com/operatr-io/kpow-streams-agent"}
:scm {:name "git" :url "https://github.com/factorhouse/kpow-streams-agent"}
:pom-addition ([:developers
[:developer
[:id "wavejumper"]
[:name "Thomas Crowley"]
[:url "https://operatr.io"]
[:url "https://factorhouse.io"]
[:roles
[:role "developer"]
[:role "maintainer"]]]
[:developer
[:id "d-t-w"]
[:name "Derek Troy-West"]
[:url "https://operatr.io"]
[:url "https://factorhouse.io"]
[:roles
[:role "developer"]
[:role "maintainer"]]]])
:dependencies [[org.clojure/clojure "1.11.2"]
:dependencies [[org.clojure/clojure "1.11.3"]
[com.cognitect/transit-clj "1.0.333"]
[org.clojure/tools.logging "1.3.0"]
[org.apache.kafka/kafka-streams "3.2.0" :scope "provided" :exclusions [com.fasterxml.jackson.core/jackson-core]]]
[org.apache.kafka/kafka-streams "3.6.1" :scope "provided"]]
:uberjar {:prep-tasks ["clean" "javac" "compile"]
:aot :all}
:classifiers [["sources" {:source-paths ^:replace []
Expand All @@ -33,13 +33,13 @@
["javadoc" {:source-paths ^:replace []
:java-source-paths ^:replace []
:resource-paths ^:replace ["javadoc"]}]]
:profiles {:kaocha {:dependencies [[lambdaisland/kaocha "1.87.1366"]]}
:profiles {:kaocha {:dependencies [[lambdaisland/kaocha "1.88.1376"]]}
:dev {:resource-paths ["dev-resources"]
:plugins [[lein-cljfmt "0.8.0"]]
:dependencies [[org.slf4j/slf4j-api "2.0.12"]
:plugins [[lein-cljfmt "0.9.2"]]
:dependencies [[org.slf4j/slf4j-api "2.0.13"]
[ch.qos.logback/logback-classic "1.3.14"]
[cheshire "5.12.0" :exclusions [com.fasterxml.jackson.core/jackson-databind]]
[clj-kondo "2024.02.12"]]}
[cheshire "5.13.0" :exclusions [com.fasterxml.jackson.core/jackson-databind]]
[clj-kondo "2024.03.13"]]}
:smoke {:pedantic? :abort}}
:aliases {"kaocha" ["with-profile" "+kaocha" "run" "-m" "kaocha.runner"]
"kondo" ["with-profile" "+smoke" "run" "-m" "clj-kondo.main" "--lint" "src"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns io.operatr.kpow.agent
(ns io.factorhouse.kpow.agent
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.core.protocols :as p])
Expand Down Expand Up @@ -113,7 +113,7 @@
:snapshot/id snapshot-id}
record (ProducerRecord. (:topic snapshot-topic) taxon value)]
(.get (.send producer record))))
(log/infof "kPow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))
(log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))

(defn plan-send
[{:keys [snapshot-topic producer snapshot-id job-id captured]}]
Expand Down Expand Up @@ -165,7 +165,7 @@
(Thread/sleep 2000)
(plan-send next-ctx))
(catch Throwable e
(log/errorf e "kPow: error sending streams snapshot for agent %s" id))))
(log/errorf e "Kpow: error sending streams snapshot for agent %s" id))))

(deliver latch true))))

Expand All @@ -178,7 +178,7 @@

(defn start-registry
[{:keys [snapshot-topic producer]}]
(log/info "kPow: starting registry")
(log/info "Kpow: starting registry")
(let [registered-topologies (atom {})
pool (Executors/newSingleThreadScheduledExecutor thread-factory)
register-fn (fn [streams topology]
Expand All @@ -197,7 +197,7 @@

(defn close-registry
[agent]
(log/info "kPow: closing registry")
(log/info "Kpow: closing registry")
(when-let [close (:close agent)]
(close))
(when-let [registered-topologies (:topologies agent)]
Expand All @@ -212,12 +212,12 @@
[agent streams topology]
(when-let [register-fn (:register agent)]
(let [id (register-fn streams topology)]
(log/infof "kPow: registring new streams agent %s" id)
(log/infof "Kpow: registring new streams agent %s" id)
id)))

(defn unregister
[agent ^String id]
(when-let [registered-topologies (:topologies agent)]
(swap! registered-topologies dissoc id)
(log/infof "kPow: unregistered streams agent %s" id)
(log/infof "Kpow: unregistered streams agent %s" id)
true))
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns io.operatr.kpow.serdes
(ns io.factorhouse.kpow.serdes
(:require [cognitect.transit :as transit])
(:import (java.io ByteArrayOutputStream ByteArrayInputStream)
(org.apache.kafka.streams.kstream Windowed Window)
Expand Down
117 changes: 117 additions & 0 deletions src/java/io/factorhouse/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.factorhouse.kpow;

import clojure.java.api.Clojure;
import clojure.lang.IFn;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;

import java.util.ArrayList;
import java.util.Properties;

public class StreamsRegistry implements AutoCloseable {

public static class StreamsAgent {
private final String _id;

StreamsAgent(String id) {
_id = id;
}

public String getId() {
return _id;
}
}

private final Object agent;

public static Properties filterProperties(Properties props) {
ArrayList<String> allowedKeys = new ArrayList<>();
allowedKeys.add("ssl.enabled.protocols");
allowedKeys.add("sasl.client.callback.handler.class");
allowedKeys.add("ssl.endpoint.identification.algorithm");
allowedKeys.add("ssl.provider");
allowedKeys.add("ssl.truststore.location");
allowedKeys.add("ssl.keystore.key");
allowedKeys.add("ssl.key.password");
allowedKeys.add("ssl.protocol");
allowedKeys.add("ssl.keystore.password");
allowedKeys.add("sasl.login.class");
allowedKeys.add("ssl.trustmanager.algorithm");
allowedKeys.add("ssl.keystore.location");
allowedKeys.add("sasl.login.callback.handler.class");
allowedKeys.add("ssl.truststore.certificates");
allowedKeys.add("ssl.cipher.suites");
allowedKeys.add("ssl.truststore.password");
allowedKeys.add("ssl.keymanager.algorithm");
allowedKeys.add("ssl.keystore.type");
allowedKeys.add("ssl.secure.random.implementation");
allowedKeys.add("ssl.truststore.type");
allowedKeys.add("sasl.jaas.config");
allowedKeys.add("ssl.keystore.certificate.chain");
allowedKeys.add("sasl.mechanism");
allowedKeys.add("sasl.oauthbearer.jwks.endpoint.url");
allowedKeys.add("sasl.oauthbearer.token.endpoint.url");
allowedKeys.add("sasl.kerberos.service.name");
allowedKeys.add("security.protocol");
allowedKeys.add("bootstrap.servers");

Properties nextProps = new Properties();
for (String key : allowedKeys) {
if (props.containsKey(key)) {
nextProps.setProperty(key, String.valueOf(props.get(key)));
}
}

String compressionType = props.getProperty("compression.type", "gzip");
nextProps.setProperty("compression.type", compressionType);

String idempotence = props.getProperty("enable.idempotence", "false");
nextProps.setProperty("enable.idempotence", idempotence);

return nextProps;
}

public StreamsRegistry(Properties props) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.serdes"));
IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer");
Serializer keySerializer = (Serializer) serdesFn.invoke();
Serializer valSerializer = (Serializer) serdesFn.invoke();
Properties producerProps = filterProperties(props);
KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer);
agent = agentFn.invoke(producer);
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register");
String id = (String) registerFn.invoke(agent, streams, topology);
if (id != null) {
return new StreamsAgent(id);
} else {
return null;
}
}

public void unregister(StreamsAgent streamsAgent) {
if (streamsAgent != null) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister");
unregisterFn.invoke(agent, streamsAgent.getId());
}
}

@Override
public void close() {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
25 changes: 14 additions & 11 deletions src/java/io/operatr/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ public static Properties filterProperties(Properties props) {
String compressionType = props.getProperty("compression.type", "gzip");
nextProps.setProperty("compression.type", compressionType);

String idempotence = props.getProperty("enable.idempotence", "false");
nextProps.setProperty("enable.idempotence", idempotence);

return nextProps;
}

public StreamsRegistry(Properties props) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn agentFn = Clojure.var("io.operatr.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.operatr.kpow.serdes"));
IFn serdesFn = Clojure.var("io.operatr.kpow.serdes", "transit-json-serializer");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.serdes"));
IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer");
Serializer keySerializer = (Serializer) serdesFn.invoke();
Serializer valSerializer = (Serializer) serdesFn.invoke();
Properties producerProps = filterProperties(props);
Expand All @@ -85,8 +88,8 @@ public StreamsRegistry(Properties props) {

public StreamsAgent register(KafkaStreams streams, Topology topology) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn registerFn = Clojure.var("io.operatr.kpow.agent", "register");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register");
String id = (String) registerFn.invoke(agent, streams, topology);
if (id != null) {
return new StreamsAgent(id);
Expand All @@ -98,17 +101,17 @@ public StreamsAgent register(KafkaStreams streams, Topology topology) {
public void unregister(StreamsAgent streamsAgent) {
if (streamsAgent != null) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn unregisterFn = Clojure.var("io.operatr.kpow.agent", "unregister");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister");
unregisterFn.invoke(agent, streamsAgent.getId());
}
}

@Override
public void close() throws Exception {
public void close() {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn closeFn = Clojure.var("io.operatr.kpow.agent", "close-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
Loading

0 comments on commit aa9a791

Please sign in to comment.