From c27760f91f536f92eb122d699de71df0a5d9f30e Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Wed, 16 Oct 2024 07:50:53 +0200 Subject: [PATCH] Implement new types for message ids --- .../blocking/tcp/BytesDeserializer.java | 10 ++--- .../clients/blocking/tcp/BytesSerializer.java | 6 +-- .../rs/iggy/message/BigIntegerMessageId.java | 29 +++++++++++++++ .../java/rs/iggy/message/BytesMessageId.java | 34 +++++++++++++++++ src/main/java/rs/iggy/message/Message.java | 2 +- src/main/java/rs/iggy/message/MessageId.java | 20 ++++++++++ .../java/rs/iggy/message/MessageToSend.java | 3 +- .../java/rs/iggy/message/UuidMessageId.java | 37 +++++++++++++++++++ .../blocking/MessagesClientBaseTest.java | 23 ++---------- 9 files changed, 134 insertions(+), 30 deletions(-) create mode 100644 src/main/java/rs/iggy/message/BigIntegerMessageId.java create mode 100644 src/main/java/rs/iggy/message/BytesMessageId.java create mode 100644 src/main/java/rs/iggy/message/MessageId.java create mode 100644 src/main/java/rs/iggy/message/UuidMessageId.java diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java index 1ca9f37..20196cf 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java @@ -150,7 +150,7 @@ static Message readMessage(ByteBuf response) { var stateCode = response.readByte(); var state = MessageState.fromCode(stateCode); var timestamp = readU64AsBigInteger(response); - var id = readU128AsBigInteger(response); + var id = readBytesMessageId(response); var checksum = response.readUnsignedIntLE(); var headersLength = response.readUnsignedIntLE(); var headers = Optional.>empty(); @@ -370,11 +370,11 @@ private static BigInteger readU64AsBigInteger(ByteBuf buffer) { return new BigInteger(bytesArray); } - private static BigInteger readU128AsBigInteger(ByteBuf buffer) { - var bytesArray = new byte[17]; - buffer.readBytes(bytesArray, 0, 16); + private static BytesMessageId readBytesMessageId(ByteBuf buffer) { + var bytesArray = new byte[16]; + buffer.readBytes(bytesArray); ArrayUtils.reverse(bytesArray); - return new BigInteger(bytesArray); + return new BytesMessageId(bytesArray); } private static int toInt(Long size) { diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java index 05272c7..e6f43c6 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java @@ -16,7 +16,7 @@ import java.math.BigInteger; import java.util.Map; -final class BytesSerializer { +public final class BytesSerializer { private BytesSerializer() { } @@ -63,7 +63,7 @@ static ByteBuf toBytes(Partitioning partitioning) { static ByteBuf toBytes(MessageToSend message) { var buffer = Unpooled.buffer(); - buffer.writeBytes(toBytesAsU128(message.id())); + buffer.writeBytes(message.id().toBytes()); message.headers().ifPresentOrElse((headers) -> { var headersBytes = toBytes(headers); buffer.writeIntLE(headersBytes.readableBytes()); @@ -180,7 +180,7 @@ static ByteBuf toBytesAsU64(BigInteger value) { return buffer; } - static ByteBuf toBytesAsU128(BigInteger value) { + public static ByteBuf toBytesAsU128(BigInteger value) { if (value.signum() == -1) { throw new IllegalArgumentException("Negative value cannot be serialized to unsigned 128: " + value); } diff --git a/src/main/java/rs/iggy/message/BigIntegerMessageId.java b/src/main/java/rs/iggy/message/BigIntegerMessageId.java new file mode 100644 index 0000000..e1df6ad --- /dev/null +++ b/src/main/java/rs/iggy/message/BigIntegerMessageId.java @@ -0,0 +1,29 @@ +package rs.iggy.message; + +import io.netty.buffer.ByteBuf; +import rs.iggy.clients.blocking.tcp.BytesSerializer; +import java.math.BigInteger; + +public class BigIntegerMessageId implements MessageId { + + private final BigInteger value; + + public BigIntegerMessageId(BigInteger value) { + this.value = value; + } + + @Override + public BigInteger toBigInteger() { + return value; + } + + public ByteBuf toBytes() { + return BytesSerializer.toBytesAsU128(value); + } + + @Override + public String toString() { + return value.toString(); + } + +} diff --git a/src/main/java/rs/iggy/message/BytesMessageId.java b/src/main/java/rs/iggy/message/BytesMessageId.java new file mode 100644 index 0000000..af47f0a --- /dev/null +++ b/src/main/java/rs/iggy/message/BytesMessageId.java @@ -0,0 +1,34 @@ +package rs.iggy.message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.math.BigInteger; +import java.util.Arrays; + +public class BytesMessageId implements MessageId { + + private final byte[] value; + + public BytesMessageId(byte[] value) { + if (value.length != 16) { + throw new IllegalArgumentException("Message id must have 16 bytes"); + } + this.value = value; + } + + @Override + public BigInteger toBigInteger() { + return new BigInteger(1, value); + } + + @Override + public ByteBuf toBytes() { + return Unpooled.wrappedBuffer(value); + } + + @Override + public String toString() { + return Arrays.toString(value); + } + +} diff --git a/src/main/java/rs/iggy/message/Message.java b/src/main/java/rs/iggy/message/Message.java index 75a73ea..8267ed8 100644 --- a/src/main/java/rs/iggy/message/Message.java +++ b/src/main/java/rs/iggy/message/Message.java @@ -8,7 +8,7 @@ public record Message( BigInteger offset, MessageState state, BigInteger timestamp, - BigInteger id, + MessageId id, Long checksum, Optional> headers, byte[] payload diff --git a/src/main/java/rs/iggy/message/MessageId.java b/src/main/java/rs/iggy/message/MessageId.java new file mode 100644 index 0000000..9c1c8ca --- /dev/null +++ b/src/main/java/rs/iggy/message/MessageId.java @@ -0,0 +1,20 @@ +package rs.iggy.message; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import io.netty.buffer.ByteBuf; +import java.math.BigInteger; + +public interface MessageId { + + @JsonCreator + static MessageId from(BigInteger bigInteger) { + return new BigIntegerMessageId(bigInteger); + } + + @JsonValue + BigInteger toBigInteger(); + + ByteBuf toBytes(); + +} diff --git a/src/main/java/rs/iggy/message/MessageToSend.java b/src/main/java/rs/iggy/message/MessageToSend.java index 9fef992..6e5e777 100644 --- a/src/main/java/rs/iggy/message/MessageToSend.java +++ b/src/main/java/rs/iggy/message/MessageToSend.java @@ -1,11 +1,10 @@ package rs.iggy.message; -import java.math.BigInteger; import java.util.Map; import java.util.Optional; public record MessageToSend( - BigInteger id, + MessageId id, byte[] payload, Optional> headers ) { diff --git a/src/main/java/rs/iggy/message/UuidMessageId.java b/src/main/java/rs/iggy/message/UuidMessageId.java new file mode 100644 index 0000000..49e0e5c --- /dev/null +++ b/src/main/java/rs/iggy/message/UuidMessageId.java @@ -0,0 +1,37 @@ +package rs.iggy.message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.UUID; + +public class UuidMessageId implements MessageId { + + private final UUID value; + + public UuidMessageId(UUID value) { + this.value = value; + } + + @Override + public BigInteger toBigInteger() { + ByteBuffer buffer = ByteBuffer.wrap(new byte[16]); + buffer.putLong(value.getMostSignificantBits()); + buffer.putLong(value.getLeastSignificantBits()); + return new BigInteger(1, buffer.array()); + } + + public ByteBuf toBytes() { + var buffer = Unpooled.buffer(16, 16); + buffer.writeLongLE(value.getLeastSignificantBits()); + buffer.writeLongLE(value.getMostSignificantBits()); + return buffer; + } + + @Override + public String toString() { + return value.toString(); + } + +} diff --git a/src/test/java/rs/iggy/clients/blocking/MessagesClientBaseTest.java b/src/test/java/rs/iggy/clients/blocking/MessagesClientBaseTest.java index 9908023..3c28b8e 100644 --- a/src/test/java/rs/iggy/clients/blocking/MessagesClientBaseTest.java +++ b/src/test/java/rs/iggy/clients/blocking/MessagesClientBaseTest.java @@ -2,12 +2,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import rs.iggy.message.MessageToSend; -import rs.iggy.message.Partitioning; -import rs.iggy.message.PollingKind; -import rs.iggy.message.PollingStrategy; +import rs.iggy.message.*; import java.math.BigInteger; -import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; import static java.util.Optional.empty; @@ -32,7 +28,7 @@ void shouldSendAndGetMessages() { // when String text = "message from java sdk"; messagesClient.sendMessages(42L, 42L, Partitioning.partitionId(1L), - List.of(new MessageToSend(getRandomId(), text.getBytes(), empty()))); + List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty()))); var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L, new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false); @@ -49,7 +45,7 @@ void shouldSendMessageWithBalancedPartitioning() { // when String text = "message from java sdk"; messagesClient.sendMessages(42L, 42L, Partitioning.balanced(), - List.of(new MessageToSend(getRandomId(), text.getBytes(), empty()))); + List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty()))); var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L, new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false); @@ -66,7 +62,7 @@ void shouldSendMessageWithMessageKeyPartitioning() { // when String text = "message from java sdk"; messagesClient.sendMessages(42L, 42L, Partitioning.messagesKey("test-key"), - List.of(new MessageToSend(getRandomId(), text.getBytes(), empty()))); + List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty()))); var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L, new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false); @@ -75,15 +71,4 @@ void shouldSendMessageWithMessageKeyPartitioning() { assertThat(polledMessages.messages()).hasSize(1); } - private static BigInteger getRandomId() { - return new BigInteger(1, uuidToBytes(UUID.randomUUID())); - } - - private static byte[] uuidToBytes(UUID uuid) { - ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); - byteBuffer.putLong(uuid.getMostSignificantBits()); - byteBuffer.putLong(uuid.getLeastSignificantBits()); - return byteBuffer.array(); - } - }