diff --git a/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java b/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java index caccded..9036651 100644 --- a/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java +++ b/examples/simple-consumer/src/main/java/rs/iggy/SimpleConsumer.java @@ -10,7 +10,6 @@ import rs.iggy.identifier.TopicId; import rs.iggy.message.PolledMessage; import rs.iggy.message.PolledMessages; -import rs.iggy.message.PollingKind; import rs.iggy.message.PollingStrategy; import rs.iggy.stream.StreamDetails; import rs.iggy.topic.CompressionAlgorithm; @@ -46,8 +45,8 @@ public static void main(String[] args) { .pollMessages(STREAM_ID, TOPIC_ID, empty(), - new Consumer(Consumer.Kind.ConsumerGroup, GROUP_ID), - new PollingStrategy(PollingKind.Next, BigInteger.ZERO), + Consumer.group(GROUP_ID), + PollingStrategy.next(), 10L, true); messages.addAll(polledMessages.messages()); diff --git a/examples/simple-producer/src/main/java/rs/iggy/SimpleProducer.java b/examples/simple-producer/src/main/java/rs/iggy/SimpleProducer.java index 3dbbf78..a462c57 100644 --- a/examples/simple-producer/src/main/java/rs/iggy/SimpleProducer.java +++ b/examples/simple-producer/src/main/java/rs/iggy/SimpleProducer.java @@ -5,10 +5,8 @@ import rs.iggy.clients.blocking.tcp.IggyTcpClient; import rs.iggy.identifier.StreamId; import rs.iggy.identifier.TopicId; -import rs.iggy.message.BigIntegerMessageId; import rs.iggy.message.Message; import rs.iggy.message.Partitioning; -import rs.iggy.message.PartitioningKind; import rs.iggy.stream.StreamDetails; import rs.iggy.topic.CompressionAlgorithm; import rs.iggy.topic.TopicDetails; @@ -33,13 +31,8 @@ public static void main(String[] args) { int counter = 0; while (counter++ < 1000) { - var text = "message from simple producer " + counter; - var message = new Message(new BigIntegerMessageId(BigInteger.ZERO), text.getBytes(), empty()); - client.messages() - .sendMessages(STREAM_ID, - TOPIC_ID, - new Partitioning(PartitioningKind.Balanced, new byte[0]), - singletonList(message)); + var message = Message.of("message from simple producer " + counter); + client.messages().sendMessages(STREAM_ID, TOPIC_ID, Partitioning.balanced(), singletonList(message)); log.debug("Message {} sent", counter); } diff --git a/java-sdk/src/main/java/rs/iggy/consumergroup/Consumer.java b/java-sdk/src/main/java/rs/iggy/consumergroup/Consumer.java index b30a739..1b7d357 100644 --- a/java-sdk/src/main/java/rs/iggy/consumergroup/Consumer.java +++ b/java-sdk/src/main/java/rs/iggy/consumergroup/Consumer.java @@ -8,6 +8,18 @@ public static Consumer of(Long id) { return new Consumer(Kind.Consumer, ConsumerId.of(id)); } + public static Consumer of(ConsumerId id) { + return new Consumer(Kind.Consumer, id); + } + + public static Consumer group(Long id) { + return new Consumer(Kind.ConsumerGroup, ConsumerId.of(id)); + } + + public static Consumer group(ConsumerId id) { + return new Consumer(Kind.ConsumerGroup, id); + } + public enum Kind { Consumer(1), ConsumerGroup(2); diff --git a/java-sdk/src/main/java/rs/iggy/message/BigIntegerMessageId.java b/java-sdk/src/main/java/rs/iggy/message/BigIntegerMessageId.java index e1df6ad..66f0064 100644 --- a/java-sdk/src/main/java/rs/iggy/message/BigIntegerMessageId.java +++ b/java-sdk/src/main/java/rs/iggy/message/BigIntegerMessageId.java @@ -6,12 +6,17 @@ public class BigIntegerMessageId implements MessageId { + private static final BigIntegerMessageId DEFAULT_ID = new BigIntegerMessageId(BigInteger.ZERO); private final BigInteger value; public BigIntegerMessageId(BigInteger value) { this.value = value; } + public static BigIntegerMessageId defaultId() { + return DEFAULT_ID; + } + @Override public BigInteger toBigInteger() { return value; diff --git a/java-sdk/src/main/java/rs/iggy/message/Message.java b/java-sdk/src/main/java/rs/iggy/message/Message.java index 0e14ce1..c94decd 100644 --- a/java-sdk/src/main/java/rs/iggy/message/Message.java +++ b/java-sdk/src/main/java/rs/iggy/message/Message.java @@ -8,4 +8,9 @@ public record Message( byte[] payload, Optional> headers ) { + + public static Message of(String payload) { + return new Message(MessageId.serverGenerated(), payload.getBytes(), Optional.empty()); + } + } diff --git a/java-sdk/src/main/java/rs/iggy/message/MessageId.java b/java-sdk/src/main/java/rs/iggy/message/MessageId.java index 9c1c8ca..388d1b7 100644 --- a/java-sdk/src/main/java/rs/iggy/message/MessageId.java +++ b/java-sdk/src/main/java/rs/iggy/message/MessageId.java @@ -7,6 +7,10 @@ public interface MessageId { + static MessageId serverGenerated() { + return BigIntegerMessageId.defaultId(); + } + @JsonCreator static MessageId from(BigInteger bigInteger) { return new BigIntegerMessageId(bigInteger); diff --git a/java-sdk/src/main/java/rs/iggy/message/PollingStrategy.java b/java-sdk/src/main/java/rs/iggy/message/PollingStrategy.java index 3aef5e2..87fa511 100644 --- a/java-sdk/src/main/java/rs/iggy/message/PollingStrategy.java +++ b/java-sdk/src/main/java/rs/iggy/message/PollingStrategy.java @@ -3,4 +3,25 @@ import java.math.BigInteger; public record PollingStrategy(PollingKind kind, BigInteger value) { + + public static PollingStrategy offset(BigInteger value) { + return new PollingStrategy(PollingKind.Offset, value); + } + + public static PollingStrategy timestamp(BigInteger value) { + return new PollingStrategy(PollingKind.Timestamp, value); + } + + public static PollingStrategy first() { + return new PollingStrategy(PollingKind.First, BigInteger.ZERO); + } + + public static PollingStrategy last() { + return new PollingStrategy(PollingKind.Last, BigInteger.ZERO); + } + + public static PollingStrategy next() { + return new PollingStrategy(PollingKind.Next, BigInteger.ZERO); + } + }