From 27e9a7519b1f749c4e8d42573e27aff6b4acd8e9 Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Sat, 19 Oct 2024 09:05:53 +0200 Subject: [PATCH] Add consumer example --- examples/simple-consumer/build.gradle.kts | 16 ++++ .../src/main/java/rs/iggy/SimpleConsumer.java | 95 +++++++++++++++++++ settings.gradle.kts | 4 + 3 files changed, 115 insertions(+) create mode 100644 examples/simple-consumer/build.gradle.kts create mode 100644 examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java diff --git a/examples/simple-consumer/build.gradle.kts b/examples/simple-consumer/build.gradle.kts new file mode 100644 index 0000000..e332cc2 --- /dev/null +++ b/examples/simple-consumer/build.gradle.kts @@ -0,0 +1,16 @@ +plugins { + id("java") +} + +group = "rs.iggy" +version = "0.0.1-SNAPSHOT" + +repositories { + mavenCentral() +} + +dependencies { + implementation(project(":sdk")) + implementation("org.slf4j:slf4j-api:2.0.9") + runtimeOnly("ch.qos.logback:logback-classic:1.4.12") +} diff --git a/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java b/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java new file mode 100644 index 0000000..31da655 --- /dev/null +++ b/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java @@ -0,0 +1,95 @@ +package rs.iggy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rs.iggy.clients.blocking.tcp.IggyTcpClient; +import rs.iggy.consumergroup.Consumer; +import rs.iggy.consumergroup.ConsumerGroupDetails; +import rs.iggy.identifier.ConsumerId; +import rs.iggy.identifier.StreamId; +import rs.iggy.identifier.TopicId; +import rs.iggy.message.Message; +import rs.iggy.message.PolledMessages; +import rs.iggy.message.PollingKind; +import rs.iggy.message.PollingStrategy; +import rs.iggy.stream.StreamDetails; +import rs.iggy.topic.CompressionAlgorithm; +import rs.iggy.topic.TopicDetails; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import static java.util.Optional.empty; + +public class SimpleConsumer { + + private static final String STREAM_NAME = "dev01"; + private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME); + private static final String TOPIC_NAME = "events"; + private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME); + private static final String GROUP_NAME = "simple-consumer"; + private static final ConsumerId GROUP_ID = ConsumerId.of(GROUP_NAME); + private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class); + + public static void main(String[] args) { + IggyTcpClient client = new IggyTcpClient("localhost", 8090); + client.users().login("iggy", "iggy"); + + createStream(client); + createTopic(client); + createConsumerGroup(client); + client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID); + + List messages = new ArrayList<>(); + while (messages.size() < 1000) { + PolledMessages polledMessages = client.messages() + .pollMessages(STREAM_ID, + TOPIC_ID, + empty(), + new Consumer(Consumer.Kind.ConsumerGroup, GROUP_ID), + new PollingStrategy(PollingKind.Next, BigInteger.ZERO), + 10L, + true); + messages.addAll(polledMessages.messages()); + log.debug("Fetched {} messages from partition {}, current offset {}", + polledMessages.messages().size(), + polledMessages.partitionId(), + polledMessages.currentOffset()); + } + } + + private static void createStream(IggyTcpClient client) { + Optional stream = client.streams().getStream(STREAM_ID); + if (stream.isPresent()) { + return; + } + client.streams().createStream(empty(), STREAM_NAME); + } + + private static void createTopic(IggyTcpClient client) { + Optional topic = client.topics().getTopic(STREAM_ID, TOPIC_ID); + if (topic.isPresent()) { + return; + } + client.topics() + .createTopic(STREAM_ID, + empty(), + 1L, + CompressionAlgorithm.none, + BigInteger.ZERO, + BigInteger.ZERO, + empty(), + TOPIC_NAME); + + } + + private static void createConsumerGroup(IggyTcpClient client) { + Optional consumerGroup = client.consumerGroups() + .getConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID); + if (consumerGroup.isPresent()) { + return; + } + client.consumerGroups().createConsumerGroup(STREAM_ID, TOPIC_ID, empty(), GROUP_NAME); + } + +} diff --git a/settings.gradle.kts b/settings.gradle.kts index beb2689..0128085 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,2 +1,6 @@ rootProject.name = "iggy-java-client" + include("sdk") + +include("simple-consumer-example") +project(":simple-consumer-example").projectDir = file("examples/simple-consumer")