Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

### Unreleased

### [0.9.13] - [2025-05-16]
- Fix partitions-for checking against concrete class instead of interface. Prevents mocked Consumer and Producer instances from working during testing.

### [0.9.12] - [2023-12-05]
- Support for Foreign Key joins [#365](https://github.com/FundingCircle/jackdaw/pull/365) (Issue [#364])
- add manifold and keep aleph in dev dependencies [#360](https://github.com/FundingCircle/jackdaw/pull/360). Users of test-machine will have to add aleph to the test deps in their app.
Expand Down
51 changes: 48 additions & 3 deletions src/jackdaw/admin.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[manifold.deferred :as d])
(:import [java.util Collection Properties]
[org.apache.kafka.clients.admin AdminClient
DescribeTopicsOptions DescribeClusterOptions DescribeConfigsOptions]))
DescribeTopicsOptions DescribeClusterOptions DescribeConfigsOptions DescribeConsumerGroupsOptions ConsumerGroupListing ConsumerGroupDescription]))

(set! *warn-on-reflection* true)

Expand All @@ -22,15 +22,17 @@
(describe-topics* [this topics])
(describe-configs* [this configs])
(describe-cluster* [this])
(describe-consumer-groups* [this group-ids])
(list-consumer-groups* [this])
(list-topics* [this]))

(def client-impl
{:alter-topics* (fn [this topics]
(d/future
@(.all (.alterConfigs ^AdminClient this topics))))
:create-topics* (fn [this topics]
(d/future
@(.all (.createTopics ^AdminClient this ^Collection topics))))
(d/future
@(.all (.createTopics ^AdminClient this ^Collection topics))))
:delete-topics* (fn [this topics]
(d/future
@(.all (.deleteTopics ^AdminClient this ^Collection topics))))
Expand All @@ -43,6 +45,12 @@
:describe-cluster* (fn [this]
(d/future
(jd/datafy (.describeCluster ^AdminClient this (DescribeClusterOptions.)))))
:describe-consumer-groups* (fn [this group-ids]
(d/future
@(.all (.describeConsumerGroups ^AdminClient this group-ids (DescribeConsumerGroupsOptions.)))))
:list-consumer-groups* (fn [this]
(d/future
@(.all (.listConsumerGroups ^AdminClient this))))
:list-topics* (fn [this]
(d/future
@(.names (.listTopics ^AdminClient this))))})
Expand All @@ -65,6 +73,17 @@
[x]
(instance? AdminClient x))

(defn list-consumer-groups
"Given an `AdminClient`, return a seq of consumer groups, being the
consumer groups on the cluster."
[^AdminClient client]
{:pre [(client? client)]}
(->> @(list-consumer-groups* client)
;; We should allow the caller to decide whether they want
;; the result to be sorted or not?
(map #(.groupId ^ConsumerGroupListing %))
sort))

(defn list-topics
"Given an `AdminClient`, return a seq of topic records, being the
topics on the cluster."
Expand Down Expand Up @@ -146,6 +165,25 @@
(assoc m (jd/datafy k) (jd/datafy v)))
{})))

(defn describe-consumer-groups
"Given an `AdminClient` and an optional collection of topic
descriptors, return a map from topic names to topic
descriptions.

If no topics are provided, describes all topics.

Note that the topic description does NOT include the topic's
configuration.See `#'describe-topic-config` for that capability."
([^AdminClient client]
{:pre [(client? client)]}
(describe-consumer-groups client (list-consumer-groups client)))
([^AdminClient client group-ids]
{:pre [(client? client)
(sequential? group-ids)]}
(->> @(describe-consumer-groups* client group-ids)
(map (fn [[k v]] [k (jd/datafy v)]))
(into {}))))

(defn topics-ready?
"Given an `AdminClient` and a sequence topic descriptors, return
`true` if and only if all listed topics have a leader and in-sync
Expand Down Expand Up @@ -228,3 +266,10 @@
{:pre [(client? client)]}
(-> @(describe-configs* client [(jd/->broker-resource (str broker-id))])
vals first jd/datafy))

(comment (def client (AdminClient/create ^Properties (jd/map->Properties {"bootstrap.servers" "b-2.data-services-prd.6ktv70.c4.kafka.us-east-1.amazonaws.com:9092"})))

(def consumer-group-result (describe-consumer-groups client))

(map #(spit "topics.txt" (str % "\n") :append true) (clojure.set/difference (set (map :topic-name (list-topics client))) (set (flatten (map (fn [{:keys [members]}] (map (fn [{{:keys [topic-partitions]} :assignment}] (map :topic-name topic-partitions)) members)) (vals consumer-group-result))))))
)
13 changes: 7 additions & 6 deletions src/jackdaw/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@
metadata about the partitions assigned to the given consumer or
producer."
[producer-or-consumer {:keys [^String topic-name]}]
(->> (cond (instance? KafkaConsumer producer-or-consumer)
(.partitionsFor ^KafkaConsumer producer-or-consumer topic-name)
(->> (cond (instance? Consumer producer-or-consumer)
(.partitionsFor ^Consumer producer-or-consumer topic-name)

(instance? KafkaProducer producer-or-consumer)
(.partitionsFor ^KafkaProducer producer-or-consumer topic-name)
(instance? Producer producer-or-consumer)
(.partitionsFor ^Producer producer-or-consumer topic-name)

:else (throw (ex-info "Got non producer/consumer!"
{:inst producer-or-consumer
Expand Down Expand Up @@ -283,8 +283,9 @@
(map #(select-keys % [:topic-name :partition])))
end-offsets (end-offsets consumer topic-partitions)
ts-offsets (offsets-for-times consumer
(zipmap topic-partitions
(repeat (count topic-partitions) timestamp)))
(zipmap topic-partitions
(repeat (count topic-partitions) timestamp)))
end-offsets (end-offsets consumer topic-partitions)
offsets (reduce-kv (fn [m k v]
(assoc m k {:ts-offset v
:end-offset (get end-offsets k)}))
Expand Down
24 changes: 22 additions & 2 deletions src/jackdaw/data/admin.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

(import '[org.apache.kafka.clients.admin
Config ConfigEntry DescribeClusterResult NewTopic
TopicDescription])
TopicDescription ConsumerGroupDescription MemberDescription
MemberAssignment])

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -45,6 +46,25 @@
{:is-internal? (.isInternal td)
:partition-info (map datafy (.partitions td))})

;;; ConsumerGroupDescription

(defn->data ConsumerGroupDescription->data
[^ConsumerGroupDescription cd]
{:is-simple-consumer-group (.isSimpleConsumerGroup cd)
:members (map datafy (.members cd))})

;;; MemberDescription

(defn->data MemberDescription->data
[^MemberDescription md]
{:assignment (datafy (.assignment md))})

;; MemberAssignment

(defn->data MemberAssignment->data
[^MemberAssignment ma]
{:topic-partitions (map datafy (.topicPartitions ma))})

;;; NewTopic

(defn map->NewTopic
Expand All @@ -69,4 +89,4 @@
[^DescribeClusterResult dcr]
{:cluster-id (-> dcr .clusterId .get)
:controller (-> dcr .controller .get datafy)
:nodes (->> dcr .nodes .get (mapv datafy))})
:nodes (->> dcr .nodes .get (mapv datafy))})
Loading