Skip to content

Commit

Permalink
Rename message classes to align with server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 23, 2024
1 parent 583abcb commit 2c9b5d9
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import rs.iggy.identifier.ConsumerId;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.Message;
import rs.iggy.message.PolledMessage;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingKind;
import rs.iggy.message.PollingStrategy;
Expand Down Expand Up @@ -40,7 +40,7 @@ public static void main(String[] args) {
createConsumerGroup(client);
client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID);

List<Message> messages = new ArrayList<>();
List<PolledMessage> messages = new ArrayList<>();
while (messages.size() < 1000) {
PolledMessages polledMessages = client.messages()
.pollMessages(STREAM_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.BigIntegerMessageId;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PartitioningKind;
import rs.iggy.stream.StreamDetails;
Expand Down Expand Up @@ -34,7 +34,7 @@ public static void main(String[] args) {
int counter = 0;
while (counter++ < 1000) {
var text = "message from simple producer " + counter;
var message = new MessageToSend(new BigIntegerMessageId(BigInteger.ZERO), text.getBytes(), empty());
var message = new Message(new BigIntegerMessageId(BigInteger.ZERO), text.getBytes(), empty());
client.messages()
.sendMessages(STREAM_ID,
TOPIC_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingStrategy;
Expand All @@ -19,10 +19,10 @@ default PolledMessages pollMessages(Long streamId, Long topicId, Optional<Long>

PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer, PollingStrategy strategy, Long count, boolean autoCommit);

default void sendMessages(Long streamId, Long topicId, Partitioning partitioning, List<MessageToSend> messages) {
default void sendMessages(Long streamId, Long topicId, Partitioning partitioning, List<Message> messages) {
sendMessages(StreamId.of(streamId), TopicId.of(topicId), partitioning, messages);
}

void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages);
void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<Message> messages);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingStrategy;
Expand Down Expand Up @@ -33,7 +33,7 @@ public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<
}

@Override
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages) {
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<Message> messages) {
var request = httpClient.preparePostRequest(path(streamId, topicId), new SendMessages(partitioning, messages));
httpClient.execute(request);
}
Expand All @@ -42,6 +42,6 @@ private static String path(StreamId streamId, TopicId topicId) {
return "/streams/" + streamId + "/topics/" + topicId + "/messages";
}

private record SendMessages(Partitioning partitioning, List<MessageToSend> messages) {
private record SendMessages(Partitioning partitioning, List<Message> messages) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ public static PolledMessages readPolledMessages(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var currentOffset = readU64AsBigInteger(response);
var _messagesCount = response.readUnsignedIntLE();
var messages = new ArrayList<Message>();
var messages = new ArrayList<PolledMessage>();
while (response.isReadable()) {
messages.add(readMessage(response));
messages.add(readPolledMessage(response));
}
return new PolledMessages(partitionId, currentOffset, messages);
}

static Message readMessage(ByteBuf response) {
static PolledMessage readPolledMessage(ByteBuf response) {
var offset = readU64AsBigInteger(response);
var stateCode = response.readByte();
var state = MessageState.fromCode(stateCode);
Expand All @@ -171,7 +171,7 @@ static Message readMessage(ByteBuf response) {
var payloadLength = response.readUnsignedIntLE();
var payload = newByteArray(payloadLength);
response.readBytes(payload);
return new Message(offset, state, timestamp, id, checksum, headers, payload);
return new PolledMessage(offset, state, timestamp, id, checksum, headers, payload);
}

static Stats readStats(ByteBuf response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.Identifier;
import rs.iggy.message.HeaderValue;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PollingStrategy;
import rs.iggy.user.GlobalPermissions;
Expand Down Expand Up @@ -61,7 +61,7 @@ static ByteBuf toBytes(Partitioning partitioning) {
return buffer;
}

static ByteBuf toBytes(MessageToSend message) {
static ByteBuf toBytes(Message message) {
var buffer = Unpooled.buffer();
buffer.writeBytes(message.id().toBytes());
message.headers().ifPresentOrElse((headers) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Message;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingStrategy;
Expand Down Expand Up @@ -39,7 +39,7 @@ public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<
}

@Override
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages) {
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<Message> messages) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(partitioning));
Expand Down
9 changes: 2 additions & 7 deletions java-sdk/src/main/java/rs/iggy/message/Message.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package rs.iggy.message;

import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;

public record Message(
BigInteger offset,
MessageState state,
BigInteger timestamp,
MessageId id,
Long checksum,
Optional<Map<String, HeaderValue>> headers,
byte[] payload
byte[] payload,
Optional<Map<String, HeaderValue>> headers
) {
}
11 changes: 0 additions & 11 deletions java-sdk/src/main/java/rs/iggy/message/MessageToSend.java

This file was deleted.

16 changes: 16 additions & 0 deletions java-sdk/src/main/java/rs/iggy/message/PolledMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package rs.iggy.message;

import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;

public record PolledMessage(
BigInteger offset,
MessageState state,
BigInteger timestamp,
MessageId id,
Long checksum,
Optional<Map<String, HeaderValue>> headers,
byte[] payload
) {
}
2 changes: 1 addition & 1 deletion java-sdk/src/main/java/rs/iggy/message/PolledMessages.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
public record PolledMessages(
Long partitionId,
BigInteger currentOffset,
List<Message> messages
List<PolledMessage> messages
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void shouldSendAndGetMessages() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.partitionId(1L),
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));
List.of(new Message(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand All @@ -45,7 +45,7 @@ void shouldSendMessageWithBalancedPartitioning() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.balanced(),
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));
List.of(new Message(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand All @@ -62,7 +62,7 @@ void shouldSendMessageWithMessageKeyPartitioning() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.messagesKey("test-key"),
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));
List.of(new Message(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand Down

0 comments on commit 2c9b5d9

Please sign in to comment.