Skip to content

vrih/gregor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

57 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Current Version:

[io.weft/gregor "0.3.0"]

API

Gregor

Lightweight Clojure bindings for Apache Kafka 0.9.X and up.

Gregor wraps most of the Java API for the Kafka Producer and New Consumer and is almost feature complete as of 0.2.0. The intent of this project is to stay very close to the Kafka API instead of adding more advanced features.

Example

Here's an example of at-least-once processing (using the excellent mount):

(ns gregor-sample-app.core
  (:gen-class)
  (:require [clojure.repl :as repl]
            [gregor.core :as gregor]
            [mount.core :as mount :refer [defstate]]))

(def run (atom true))

(defstate consumer
  :start (gregor/consumer "localhost:9092"
                          "testgroup"
                          ["test-topic"]
                          {"auto.offset.reset" "earliest"
                           "enable.auto.commit" "false"})
  :stop (gregor/close consumer))

(defstate producer
  :start (gregor/producer "localhost:9092")
  :stop (gregor/close producer))

(defn -main
  [& args]
  (mount/start)
  (repl/set-break-handler! (fn [sig] (reset! run false)))
  (while @run
    (let [consumer-records (gregor/poll consumer)
          values (process-records consumer-records)]
      (doseq [v values]
        (gregor/send producer "other-topic" v))
      (gregor/commit-offsets! consumer)))
  (mount/stop))

Transformations over consumer records are applied in process-records. Each record in the seq returned by poll is a map. Here's an example with a JSON object as the :value:

{:value "{\"foo\":42}"
 :key nil
 :partition 0
 :topic "test-topic"
 :offset 939}

Producing

Gregor provides the send function for asynchronously sending a record to a topic. There are multiple arities which correspond to those of the ProducerRecord Java constructor. If you'd like to provide a callback to be invoked when the send has been acknowledged use send-then instead.

About

Clojure bindings for Kafka 0.9+

Resources

Stars

Watchers

Forks

Packages

No packages published