Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update-to-factorhouse #6

Merged
merged 12 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Change Log
All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## [1.0.0] - 2024-05-07
### Changed
- Move to `io.factorhouse` domain from `io.operatr`
- Keep old `io.operatr.kpow.StreamsRegistry` entry point for backwards compatibility
- Bump Kafka Streams to 3.6.1 (same as Kpow)
- Default producer `enable.idempotence` to false (avoid ACL issues with Kafka 3.2.0+)
- Bump other dependencies to latest
- Update readme and images

## [0.2.12] - 2024-04-10
### Changed
- Bump dependencies, fix CVE-2024-22871
Expand Down Expand Up @@ -55,7 +64,6 @@ All notable changes to this project will be documented in this file. This change
### Fixed
- Fixed formatting of logging statement


## [0.2.0] - 2021-23-06
### Added
- `io.operatr.kpow.StreamsRegistry` (initial release)
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ This repository contains the Kpow Streams Agent.
Use this agent to integrate your Kafka Streams applications with Kpow and unlock the following features:

* See summaries of Kafka Streams activity for your Kafka cluster(s).
* Monitor Kafka Streams metrics (e.g Streams State, Stream-Thread, State Store, RocksDB, etc).
* Visualise Kafka Streams topologies in the Kpow Workflows UI.
* Monitor Kafka Streams metrics (e.g Stream-Thread, State Store, RocksDB, etc).
* Aggregate and Expose Kafka Streams metrics via Kpow [Prometheus Endpoints](https://docs.kpow.io/features/prometheus) (for alerting, etc).
* (Soon) View Kpow Insights of your Kafka Streams applications (outlier metrics, etc).
* Aggregate and Expose Kafka Streams metrics via Kpow [Prometheus Endpoints](https://docs.factorhouse.io/kpow-ee/features/prometheus/) (for alerting, etc).

See the [Kpow Kafka Streams Feature Guide](https://docs.kpow.io/features/kafka-streams) for full documentation.
See the [Kpow Kafka Streams Feature Guide](https://docs.factorhouse.io/kpow-ee/features/kafka-streams/) for full documentation.

See the [Kpow Kafka Streams Spring Word Count Example](https://github.com/factorhouse/kpow-streams-spring-cloud-example) for an integration of Spring, Kafka, and Kpow.

---

![streams-ui](docs/streams.png)
![streams-ui](docs/kpow-streams.png)

---

![topology-ui](docs/topologies.png)
![topology-ui](docs/kpow-streams-topology.png)

# Prerequisites

The Kpow Streams Agent requires a running instance of Kpow.

Evaluate Kpow with the [Kpow Local](https://github.com/factorhouse/kpow-local) repository or see our [Quick Start](https://docs.kpow.io/installation/quick-start) guide.
Evaluate Kpow with the [Kpow Local](https://github.com/factorhouse/kpow-local) repository or see our [Quick Start](https://docs.factorhouse.io/kpow-ee/installation/quick-start/) guide.

# Installation

Expand All @@ -49,11 +48,11 @@ Include the agent as a dependency in your Kafka Streams application.

In your application, just before you start your KafkaStreams instance:

* Create a new [io.operatr.kpow.StreamsRegistry](https://github.com/factorhouse/kpow-streams-agent/blob/main/src/java/io/operatr/kpow/StreamsRegistry.java) instance.
* Create a new [io.factorhouse.kpow.StreamsRegistry](https://github.com/factorhouse/kpow-streams-agent/blob/main/src/java/io/factorhouse/kpow/StreamsRegistry.java) instance.
* Register your KafkaStreams and Topology instances with the StreamsRegistry.

```java
import io.operatr.kpow.StreamsRegistry;
import io.factorhouse.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topology = createMyTopology();
Expand Down Expand Up @@ -162,7 +161,7 @@ StreamsRegistry registry = new StreamsRegistry(primaryProps);
...
```

See the [Kpow Multi-Cluster Feature Guide](https://docs.kpow.io/config/multi-cluster) for more information.
See the [Kpow Multi-Cluster Feature Guide](https://docs.factorhouse.io/kpow-ee/config/multi-cluster/) for more information.

### Multi-Cluster Kpow Feedback Requested

Expand Down Expand Up @@ -195,9 +194,10 @@ You can verify `StreamsRegistry` is sending telemetry to your Kafka Cluster by u

* Select topic `__oprtr_snapshot_state`
* Choose `Transit / JSON` as the key deserializer
* Enter the following kJQ filter for the key: `.[0] == :streams`
* Choose `Last 15 minutes` as the window
* Enter the following kJQ filter: `.key[0] == :streams`

![Data Inspect](docs/data-inspect.png)
![Data Inspect](docs/kpow-data-inspect.png)

# Get Help

Expand Down
Binary file removed docs/data-inspect.png
Binary file not shown.
Binary file added docs/kpow-data-inspect.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/kpow-streams-topology.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/kpow-streams.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/screenshot.png
Binary file not shown.
Binary file removed docs/streams-ui.png
Binary file not shown.
Binary file removed docs/streams.png
Binary file not shown.
Binary file removed docs/topologies.png
Binary file not shown.
Binary file removed docs/topology-ui.png
Binary file not shown.
26 changes: 13 additions & 13 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
(defproject io.operatr/kpow-streams-agent "0.2.12"
:description "kPow's Kafka Streams monitoring agent"
:url "https://github.com/operatr-io/kpow-streams-agent"
(defproject io.factorhouse/kpow-streams-agent "0.2.13"
:description "Kpow's Kafka Streams monitoring agent"
:url "https://github.com/factorhouse/kpow-streams-agent"
:license {:name "Apache-2.0 License"
:url "https://www.apache.org/licenses/LICENSE-2.0"
:distribution :repo
:comments "same as Kafka"}
:scm {:name "git" :url "https://github.com/operatr-io/kpow-streams-agent"}
:scm {:name "git" :url "https://github.com/factorhouse/kpow-streams-agent"}
:pom-addition ([:developers
[:developer
[:id "wavejumper"]
[:name "Thomas Crowley"]
[:url "https://operatr.io"]
[:url "https://factorhouse.io"]
[:roles
[:role "developer"]
[:role "maintainer"]]]
[:developer
[:id "d-t-w"]
[:name "Derek Troy-West"]
[:url "https://operatr.io"]
[:url "https://factorhouse.io"]
[:roles
[:role "developer"]
[:role "maintainer"]]]])
:dependencies [[org.clojure/clojure "1.11.2"]
:dependencies [[org.clojure/clojure "1.11.3"]
[com.cognitect/transit-clj "1.0.333"]
[org.clojure/tools.logging "1.3.0"]
[org.apache.kafka/kafka-streams "3.2.0" :scope "provided" :exclusions [com.fasterxml.jackson.core/jackson-core]]]
[org.apache.kafka/kafka-streams "3.6.1" :scope "provided"]]
:uberjar {:prep-tasks ["clean" "javac" "compile"]
:aot :all}
:classifiers [["sources" {:source-paths ^:replace []
Expand All @@ -33,13 +33,13 @@
["javadoc" {:source-paths ^:replace []
:java-source-paths ^:replace []
:resource-paths ^:replace ["javadoc"]}]]
:profiles {:kaocha {:dependencies [[lambdaisland/kaocha "1.87.1366"]]}
:profiles {:kaocha {:dependencies [[lambdaisland/kaocha "1.88.1376"]]}
:dev {:resource-paths ["dev-resources"]
:plugins [[lein-cljfmt "0.8.0"]]
:dependencies [[org.slf4j/slf4j-api "2.0.12"]
:plugins [[lein-cljfmt "0.9.2"]]
:dependencies [[org.slf4j/slf4j-api "2.0.13"]
[ch.qos.logback/logback-classic "1.3.14"]
[cheshire "5.12.0" :exclusions [com.fasterxml.jackson.core/jackson-databind]]
[clj-kondo "2024.02.12"]]}
[cheshire "5.13.0" :exclusions [com.fasterxml.jackson.core/jackson-databind]]
[clj-kondo "2024.03.13"]]}
:smoke {:pedantic? :abort}}
:aliases {"kaocha" ["with-profile" "+kaocha" "run" "-m" "kaocha.runner"]
"kondo" ["with-profile" "+smoke" "run" "-m" "clj-kondo.main" "--lint" "src"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns io.operatr.kpow.agent
(ns io.factorhouse.kpow.agent
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.core.protocols :as p])
Expand Down Expand Up @@ -113,7 +113,7 @@
:snapshot/id snapshot-id}
record (ProducerRecord. (:topic snapshot-topic) taxon value)]
(.get (.send producer record))))
(log/infof "kPow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))
(log/infof "Kpow: sent [%s] streams metrics for application.id %s" (count metrics) application-id)))

(defn plan-send
[{:keys [snapshot-topic producer snapshot-id job-id captured]}]
Expand Down Expand Up @@ -165,7 +165,7 @@
(Thread/sleep 2000)
(plan-send next-ctx))
(catch Throwable e
(log/errorf e "kPow: error sending streams snapshot for agent %s" id))))
(log/errorf e "Kpow: error sending streams snapshot for agent %s" id))))

(deliver latch true))))

Expand All @@ -178,7 +178,7 @@

(defn start-registry
[{:keys [snapshot-topic producer]}]
(log/info "kPow: starting registry")
(log/info "Kpow: starting registry")
(let [registered-topologies (atom {})
pool (Executors/newSingleThreadScheduledExecutor thread-factory)
register-fn (fn [streams topology]
Expand All @@ -197,7 +197,7 @@

(defn close-registry
[agent]
(log/info "kPow: closing registry")
(log/info "Kpow: closing registry")
(when-let [close (:close agent)]
(close))
(when-let [registered-topologies (:topologies agent)]
Expand All @@ -212,12 +212,12 @@
[agent streams topology]
(when-let [register-fn (:register agent)]
(let [id (register-fn streams topology)]
(log/infof "kPow: registring new streams agent %s" id)
(log/infof "Kpow: registring new streams agent %s" id)
id)))

(defn unregister
[agent ^String id]
(when-let [registered-topologies (:topologies agent)]
(swap! registered-topologies dissoc id)
(log/infof "kPow: unregistered streams agent %s" id)
(log/infof "Kpow: unregistered streams agent %s" id)
true))
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(ns io.operatr.kpow.serdes
(ns io.factorhouse.kpow.serdes
(:require [cognitect.transit :as transit])
(:import (java.io ByteArrayOutputStream ByteArrayInputStream)
(org.apache.kafka.streams.kstream Windowed Window)
Expand Down
117 changes: 117 additions & 0 deletions src/java/io/factorhouse/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.factorhouse.kpow;

import clojure.java.api.Clojure;
import clojure.lang.IFn;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;

import java.util.ArrayList;
import java.util.Properties;

public class StreamsRegistry implements AutoCloseable {

public static class StreamsAgent {
private final String _id;

StreamsAgent(String id) {
_id = id;
}

public String getId() {
return _id;
}
}

private final Object agent;

public static Properties filterProperties(Properties props) {
ArrayList<String> allowedKeys = new ArrayList<>();
allowedKeys.add("ssl.enabled.protocols");
allowedKeys.add("sasl.client.callback.handler.class");
allowedKeys.add("ssl.endpoint.identification.algorithm");
allowedKeys.add("ssl.provider");
allowedKeys.add("ssl.truststore.location");
allowedKeys.add("ssl.keystore.key");
allowedKeys.add("ssl.key.password");
allowedKeys.add("ssl.protocol");
allowedKeys.add("ssl.keystore.password");
allowedKeys.add("sasl.login.class");
allowedKeys.add("ssl.trustmanager.algorithm");
allowedKeys.add("ssl.keystore.location");
allowedKeys.add("sasl.login.callback.handler.class");
allowedKeys.add("ssl.truststore.certificates");
allowedKeys.add("ssl.cipher.suites");
allowedKeys.add("ssl.truststore.password");
allowedKeys.add("ssl.keymanager.algorithm");
allowedKeys.add("ssl.keystore.type");
allowedKeys.add("ssl.secure.random.implementation");
allowedKeys.add("ssl.truststore.type");
allowedKeys.add("sasl.jaas.config");
allowedKeys.add("ssl.keystore.certificate.chain");
allowedKeys.add("sasl.mechanism");
allowedKeys.add("sasl.oauthbearer.jwks.endpoint.url");
allowedKeys.add("sasl.oauthbearer.token.endpoint.url");
allowedKeys.add("sasl.kerberos.service.name");
allowedKeys.add("security.protocol");
allowedKeys.add("bootstrap.servers");

Properties nextProps = new Properties();
for (String key : allowedKeys) {
if (props.containsKey(key)) {
nextProps.setProperty(key, String.valueOf(props.get(key)));
}
}

String compressionType = props.getProperty("compression.type", "gzip");
nextProps.setProperty("compression.type", compressionType);

String idempotence = props.getProperty("enable.idempotence", "false");
nextProps.setProperty("enable.idempotence", idempotence);

return nextProps;
}

public StreamsRegistry(Properties props) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.serdes"));
IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer");
Serializer keySerializer = (Serializer) serdesFn.invoke();
Serializer valSerializer = (Serializer) serdesFn.invoke();
Properties producerProps = filterProperties(props);
KafkaProducer producer = new KafkaProducer<>(producerProps, keySerializer, valSerializer);
agent = agentFn.invoke(producer);
}

public StreamsAgent register(KafkaStreams streams, Topology topology) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register");
String id = (String) registerFn.invoke(agent, streams, topology);
if (id != null) {
return new StreamsAgent(id);
} else {
return null;
}
}

public void unregister(StreamsAgent streamsAgent) {
if (streamsAgent != null) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister");
unregisterFn.invoke(agent, streamsAgent.getId());
}
}

@Override
public void close() {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
25 changes: 14 additions & 11 deletions src/java/io/operatr/kpow/StreamsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ public static Properties filterProperties(Properties props) {
String compressionType = props.getProperty("compression.type", "gzip");
nextProps.setProperty("compression.type", compressionType);

String idempotence = props.getProperty("enable.idempotence", "false");
nextProps.setProperty("enable.idempotence", idempotence);

return nextProps;
}

public StreamsRegistry(Properties props) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn agentFn = Clojure.var("io.operatr.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.operatr.kpow.serdes"));
IFn serdesFn = Clojure.var("io.operatr.kpow.serdes", "transit-json-serializer");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn agentFn = Clojure.var("io.factorhouse.kpow.agent", "init-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.serdes"));
IFn serdesFn = Clojure.var("io.factorhouse.kpow.serdes", "transit-json-serializer");
Serializer keySerializer = (Serializer) serdesFn.invoke();
Serializer valSerializer = (Serializer) serdesFn.invoke();
Properties producerProps = filterProperties(props);
Expand All @@ -85,8 +88,8 @@ public StreamsRegistry(Properties props) {

public StreamsAgent register(KafkaStreams streams, Topology topology) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn registerFn = Clojure.var("io.operatr.kpow.agent", "register");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn registerFn = Clojure.var("io.factorhouse.kpow.agent", "register");
String id = (String) registerFn.invoke(agent, streams, topology);
if (id != null) {
return new StreamsAgent(id);
Expand All @@ -98,17 +101,17 @@ public StreamsAgent register(KafkaStreams streams, Topology topology) {
public void unregister(StreamsAgent streamsAgent) {
if (streamsAgent != null) {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn unregisterFn = Clojure.var("io.operatr.kpow.agent", "unregister");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn unregisterFn = Clojure.var("io.factorhouse.kpow.agent", "unregister");
unregisterFn.invoke(agent, streamsAgent.getId());
}
}

@Override
public void close() throws Exception {
public void close() {
IFn require = Clojure.var("clojure.core", "require");
require.invoke(Clojure.read("io.operatr.kpow.agent"));
IFn closeFn = Clojure.var("io.operatr.kpow.agent", "close-registry");
require.invoke(Clojure.read("io.factorhouse.kpow.agent"));
IFn closeFn = Clojure.var("io.factorhouse.kpow.agent", "close-registry");
closeFn.invoke(agent);
}
}
Loading