Skip to content

Commit

Permalink
Implement factory methods for various models
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 23, 2024
1 parent 2c9b5d9 commit 4132255
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import rs.iggy.identifier.TopicId;
import rs.iggy.message.PolledMessage;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingKind;
import rs.iggy.message.PollingStrategy;
import rs.iggy.stream.StreamDetails;
import rs.iggy.topic.CompressionAlgorithm;
Expand Down Expand Up @@ -46,8 +45,8 @@ public static void main(String[] args) {
.pollMessages(STREAM_ID,
TOPIC_ID,
empty(),
new Consumer(Consumer.Kind.ConsumerGroup, GROUP_ID),
new PollingStrategy(PollingKind.Next, BigInteger.ZERO),
Consumer.group(GROUP_ID),
PollingStrategy.next(),
10L,
true);
messages.addAll(polledMessages.messages());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import rs.iggy.clients.blocking.tcp.IggyTcpClient;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.BigIntegerMessageId;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PartitioningKind;
import rs.iggy.stream.StreamDetails;
import rs.iggy.topic.CompressionAlgorithm;
import rs.iggy.topic.TopicDetails;
Expand All @@ -33,13 +31,8 @@ public static void main(String[] args) {

int counter = 0;
while (counter++ < 1000) {
var text = "message from simple producer " + counter;
var message = new Message(new BigIntegerMessageId(BigInteger.ZERO), text.getBytes(), empty());
client.messages()
.sendMessages(STREAM_ID,
TOPIC_ID,
new Partitioning(PartitioningKind.Balanced, new byte[0]),
singletonList(message));
var message = Message.of("message from simple producer " + counter);
client.messages().sendMessages(STREAM_ID, TOPIC_ID, Partitioning.balanced(), singletonList(message));
log.debug("Message {} sent", counter);
}

Expand Down
12 changes: 12 additions & 0 deletions java-sdk/src/main/java/rs/iggy/consumergroup/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ public static Consumer of(Long id) {
return new Consumer(Kind.Consumer, ConsumerId.of(id));
}

public static Consumer of(ConsumerId id) {
return new Consumer(Kind.Consumer, id);
}

public static Consumer group(Long id) {
return new Consumer(Kind.ConsumerGroup, ConsumerId.of(id));
}

public static Consumer group(ConsumerId id) {
return new Consumer(Kind.ConsumerGroup, id);
}

public enum Kind {
Consumer(1), ConsumerGroup(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@

public class BigIntegerMessageId implements MessageId {

private static final BigIntegerMessageId DEFAULT_ID = new BigIntegerMessageId(BigInteger.ZERO);
private final BigInteger value;

public BigIntegerMessageId(BigInteger value) {
this.value = value;
}

public static BigIntegerMessageId defaultId() {
return DEFAULT_ID;
}

@Override
public BigInteger toBigInteger() {
return value;
Expand Down
5 changes: 5 additions & 0 deletions java-sdk/src/main/java/rs/iggy/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ public record Message(
byte[] payload,
Optional<Map<String, HeaderValue>> headers
) {

public static Message of(String payload) {
return new Message(MessageId.serverGenerated(), payload.getBytes(), Optional.empty());
}

}
4 changes: 4 additions & 0 deletions java-sdk/src/main/java/rs/iggy/message/MessageId.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

public interface MessageId {

static MessageId serverGenerated() {
return BigIntegerMessageId.defaultId();
}

@JsonCreator
static MessageId from(BigInteger bigInteger) {
return new BigIntegerMessageId(bigInteger);
Expand Down
21 changes: 21 additions & 0 deletions java-sdk/src/main/java/rs/iggy/message/PollingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,25 @@
import java.math.BigInteger;

public record PollingStrategy(PollingKind kind, BigInteger value) {

public static PollingStrategy offset(BigInteger value) {
return new PollingStrategy(PollingKind.Offset, value);
}

public static PollingStrategy timestamp(BigInteger value) {
return new PollingStrategy(PollingKind.Timestamp, value);
}

public static PollingStrategy first() {
return new PollingStrategy(PollingKind.First, BigInteger.ZERO);
}

public static PollingStrategy last() {
return new PollingStrategy(PollingKind.Last, BigInteger.ZERO);
}

public static PollingStrategy next() {
return new PollingStrategy(PollingKind.Next, BigInteger.ZERO);
}

}

0 comments on commit 4132255

Please sign in to comment.