From 7861bbd6c80e2e33d8948e21b15893966ce4b33f Mon Sep 17 00:00:00 2001 From: David Bernal Date: Thu, 31 Aug 2023 19:01:35 -0400 Subject: [PATCH 1/4] check types against interface instead of concrete class --- src/jackdaw/client.clj | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index cbcd0bd7..6da3ab8d 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -155,11 +155,11 @@ metadata about the partitions assigned to the given consumer or producer." [producer-or-consumer {:keys [^String topic-name]}] - (->> (cond (instance? KafkaConsumer producer-or-consumer) - (.partitionsFor ^KafkaConsumer producer-or-consumer topic-name) + (->> (cond (instance? Consumer producer-or-consumer) + (.partitionsFor ^Consumer producer-or-consumer topic-name) - (instance? KafkaProducer producer-or-consumer) - (.partitionsFor ^KafkaProducer producer-or-consumer topic-name) + (instance? Producer producer-or-consumer) + (.partitionsFor ^Producer producer-or-consumer topic-name) :else (throw (ex-info "Got non producer/consumer!" {:inst producer-or-consumer @@ -282,8 +282,8 @@ (let [topic-partitions (->> (mapcat #(partitions-for consumer %) topics) (map #(select-keys % [:topic-name :partition]))) ts-offsets (offsets-for-times consumer - (zipmap topic-partitions - (repeat (count topic-partitions) timestamp))) + (zipmap topic-partitions + (repeat (count topic-partitions) timestamp))) end-offsets (end-offsets consumer topic-partitions) offsets (reduce-kv (fn [m k v] (assoc m k {:ts-offset v From 02cafcf973f4730662e77e572e0be8eb09fae393 Mon Sep 17 00:00:00 2001 From: David Bernal Date: Thu, 31 Aug 2023 19:02:54 -0400 Subject: [PATCH 2/4] add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 494b5c0b..32e53390 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Unreleased +- Fix partitions-for checking against concrete class instead of interface. Prevents mocked Consumer and Producer instances from working during testing. + ### [0.9.11] - [2023-04-25] - v0.9.10 introduced some reflection warnings. Whilst harmless seeing `.deleteTopics` appear in your logs at app startup is a little unsettling. [#358](https://github.com/FundingCircle/jackdaw/pull/358) - fix to re-introduce correct handling of timestamped consumer records. Broke test asserting windowed results. [#356](https://github.com/FundingCircle/jackdaw/pull/356) From 61908738bd0b58d0099c3a781271b645c5e5bf55 Mon Sep 17 00:00:00 2001 From: David Bernal Date: Thu, 31 Aug 2023 19:01:35 -0400 Subject: [PATCH 3/4] check types against interface instead of concrete class --- src/jackdaw/admin.clj | 51 +++++++++++++++++++++++++++++++++++--- src/jackdaw/client.clj | 13 +++++----- src/jackdaw/data/admin.clj | 24 ++++++++++++++++-- 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/src/jackdaw/admin.clj b/src/jackdaw/admin.clj index 18ad0b55..7ed93138 100644 --- a/src/jackdaw/admin.clj +++ b/src/jackdaw/admin.clj @@ -11,7 +11,7 @@ [manifold.deferred :as d]) (:import [java.util Collection Properties] [org.apache.kafka.clients.admin AdminClient - DescribeTopicsOptions DescribeClusterOptions DescribeConfigsOptions])) + DescribeTopicsOptions DescribeClusterOptions DescribeConfigsOptions DescribeConsumerGroupsOptions ConsumerGroupListing ConsumerGroupDescription])) (set! *warn-on-reflection* true) @@ -22,6 +22,8 @@ (describe-topics* [this topics]) (describe-configs* [this configs]) (describe-cluster* [this]) + (describe-consumer-groups* [this group-ids]) + (list-consumer-groups* [this]) (list-topics* [this])) (def client-impl @@ -29,8 +31,8 @@ (d/future @(.all (.alterConfigs ^AdminClient this topics)))) :create-topics* (fn [this topics] - (d/future - @(.all (.createTopics ^AdminClient this ^Collection topics)))) + (d/future + @(.all (.createTopics ^AdminClient this ^Collection topics)))) :delete-topics* (fn [this topics] (d/future @(.all (.deleteTopics ^AdminClient this ^Collection topics)))) @@ -43,6 +45,12 @@ :describe-cluster* (fn [this] (d/future (jd/datafy (.describeCluster ^AdminClient this (DescribeClusterOptions.))))) + :describe-consumer-groups* (fn [this group-ids] + (d/future + @(.all (.describeConsumerGroups ^AdminClient this group-ids (DescribeConsumerGroupsOptions.))))) + :list-consumer-groups* (fn [this] + (d/future + @(.all (.listConsumerGroups ^AdminClient this)))) :list-topics* (fn [this] (d/future @(.names (.listTopics ^AdminClient this))))}) @@ -65,6 +73,17 @@ [x] (instance? AdminClient x)) +(defn list-consumer-groups + "Given an `AdminClient`, return a seq of consumer groups, being the + consumer groups on the cluster." + [^AdminClient client] + {:pre [(client? client)]} + (->> @(list-consumer-groups* client) + ;; We should allow the caller to decide whether they want + ;; the result to be sorted or not? + (map #(.groupId ^ConsumerGroupListing %)) + sort)) + (defn list-topics "Given an `AdminClient`, return a seq of topic records, being the topics on the cluster." @@ -146,6 +165,25 @@ (assoc m (jd/datafy k) (jd/datafy v))) {}))) +(defn describe-consumer-groups + "Given an `AdminClient` and an optional collection of topic + descriptors, return a map from topic names to topic + descriptions. + + If no topics are provided, describes all topics. + + Note that the topic description does NOT include the topic's + configuration.See `#'describe-topic-config` for that capability." + ([^AdminClient client] + {:pre [(client? client)]} + (describe-consumer-groups client (list-consumer-groups client))) + ([^AdminClient client group-ids] + {:pre [(client? client) + (sequential? group-ids)]} + (->> @(describe-consumer-groups* client group-ids) + (map (fn [[k v]] [k (jd/datafy v)])) + (into {})))) + (defn topics-ready? "Given an `AdminClient` and a sequence topic descriptors, return `true` if and only if all listed topics have a leader and in-sync @@ -228,3 +266,10 @@ {:pre [(client? client)]} (-> @(describe-configs* client [(jd/->broker-resource (str broker-id))]) vals first jd/datafy)) + +(comment (def client (AdminClient/create ^Properties (jd/map->Properties {"bootstrap.servers" "b-2.data-services-prd.6ktv70.c4.kafka.us-east-1.amazonaws.com:9092"}))) + + (def consumer-group-result (describe-consumer-groups client)) + + (map #(spit "topics.txt" (str % "\n") :append true) (clojure.set/difference (set (map :topic-name (list-topics client))) (set (flatten (map (fn [{:keys [members]}] (map (fn [{{:keys [topic-partitions]} :assignment}] (map :topic-name topic-partitions)) members)) (vals consumer-group-result)))))) + ) \ No newline at end of file diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index 6509de85..57f9bc19 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -155,11 +155,11 @@ metadata about the partitions assigned to the given consumer or producer." [producer-or-consumer {:keys [^String topic-name]}] - (->> (cond (instance? KafkaConsumer producer-or-consumer) - (.partitionsFor ^KafkaConsumer producer-or-consumer topic-name) + (->> (cond (instance? Consumer producer-or-consumer) + (.partitionsFor ^Consumer producer-or-consumer topic-name) - (instance? KafkaProducer producer-or-consumer) - (.partitionsFor ^KafkaProducer producer-or-consumer topic-name) + (instance? Producer producer-or-consumer) + (.partitionsFor ^Producer producer-or-consumer topic-name) :else (throw (ex-info "Got non producer/consumer!" {:inst producer-or-consumer @@ -283,8 +283,9 @@ (map #(select-keys % [:topic-name :partition]))) end-offsets (end-offsets consumer topic-partitions) ts-offsets (offsets-for-times consumer - (zipmap topic-partitions - (repeat (count topic-partitions) timestamp))) + (zipmap topic-partitions + (repeat (count topic-partitions) timestamp))) + end-offsets (end-offsets consumer topic-partitions) offsets (reduce-kv (fn [m k v] (assoc m k {:ts-offset v :end-offset (get end-offsets k)})) diff --git a/src/jackdaw/data/admin.clj b/src/jackdaw/data/admin.clj index 61ef07df..cfc357c6 100644 --- a/src/jackdaw/data/admin.clj +++ b/src/jackdaw/data/admin.clj @@ -2,7 +2,8 @@ (import '[org.apache.kafka.clients.admin Config ConfigEntry DescribeClusterResult NewTopic - TopicDescription]) + TopicDescription ConsumerGroupDescription MemberDescription + MemberAssignment]) (set! *warn-on-reflection* true) @@ -45,6 +46,25 @@ {:is-internal? (.isInternal td) :partition-info (map datafy (.partitions td))}) +;;; ConsumerGroupDescription + +(defn->data ConsumerGroupDescription->data + [^ConsumerGroupDescription cd] + {:is-simple-consumer-group (.isSimpleConsumerGroup cd) + :members (map datafy (.members cd))}) + +;;; MemberDescription + +(defn->data MemberDescription->data + [^MemberDescription md] + {:assignment (datafy (.assignment md))}) + +;; MemberAssignment + +(defn->data MemberAssignment->data + [^MemberAssignment ma] + {:topic-partitions (map datafy (.topicPartitions ma))}) + ;;; NewTopic (defn map->NewTopic @@ -69,4 +89,4 @@ [^DescribeClusterResult dcr] {:cluster-id (-> dcr .clusterId .get) :controller (-> dcr .controller .get datafy) - :nodes (->> dcr .nodes .get (mapv datafy))}) + :nodes (->> dcr .nodes .get (mapv datafy))}) \ No newline at end of file From 6128f2f633db9823933e9a3ba04eb01c4366472d Mon Sep 17 00:00:00 2001 From: David Bernal Date: Thu, 31 Aug 2023 19:02:54 -0400 Subject: [PATCH 4/4] add changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68c93a2a..d3ecf75b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ### Unreleased +### [0.9.13] - [2025-05-16] +- partitions-for checks against the interface to allow for better mocking + ### [0.9.12] - [2023-12-05] - Support for Foreign Key joins [#365](https://github.com/FundingCircle/jackdaw/pull/365) (Issue [#364]) - add manifold and keep aleph in dev dependencies [#360](https://github.com/FundingCircle/jackdaw/pull/360). Users of test-machine will have to add aleph to the test deps in their app.