Skip to content

Commit

Permalink
Implement new types for message ids
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 16, 2024
1 parent b7eb9a1 commit c27760f
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static Message readMessage(ByteBuf response) {
var stateCode = response.readByte();
var state = MessageState.fromCode(stateCode);
var timestamp = readU64AsBigInteger(response);
var id = readU128AsBigInteger(response);
var id = readBytesMessageId(response);
var checksum = response.readUnsignedIntLE();
var headersLength = response.readUnsignedIntLE();
var headers = Optional.<Map<String, HeaderValue>>empty();
Expand Down Expand Up @@ -370,11 +370,11 @@ private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
return new BigInteger(bytesArray);
}

private static BigInteger readU128AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[17];
buffer.readBytes(bytesArray, 0, 16);
private static BytesMessageId readBytesMessageId(ByteBuf buffer) {
var bytesArray = new byte[16];
buffer.readBytes(bytesArray);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
return new BytesMessageId(bytesArray);
}

private static int toInt(Long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.math.BigInteger;
import java.util.Map;

final class BytesSerializer {
public final class BytesSerializer {

private BytesSerializer() {
}
Expand Down Expand Up @@ -63,7 +63,7 @@ static ByteBuf toBytes(Partitioning partitioning) {

static ByteBuf toBytes(MessageToSend message) {
var buffer = Unpooled.buffer();
buffer.writeBytes(toBytesAsU128(message.id()));
buffer.writeBytes(message.id().toBytes());
message.headers().ifPresentOrElse((headers) -> {
var headersBytes = toBytes(headers);
buffer.writeIntLE(headersBytes.readableBytes());
Expand Down Expand Up @@ -180,7 +180,7 @@ static ByteBuf toBytesAsU64(BigInteger value) {
return buffer;
}

static ByteBuf toBytesAsU128(BigInteger value) {
public static ByteBuf toBytesAsU128(BigInteger value) {
if (value.signum() == -1) {
throw new IllegalArgumentException("Negative value cannot be serialized to unsigned 128: " + value);
}
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/rs/iggy/message/BigIntegerMessageId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package rs.iggy.message;

import io.netty.buffer.ByteBuf;
import rs.iggy.clients.blocking.tcp.BytesSerializer;
import java.math.BigInteger;

public class BigIntegerMessageId implements MessageId {

private final BigInteger value;

public BigIntegerMessageId(BigInteger value) {
this.value = value;
}

@Override
public BigInteger toBigInteger() {
return value;
}

public ByteBuf toBytes() {
return BytesSerializer.toBytesAsU128(value);
}

@Override
public String toString() {
return value.toString();
}

}
34 changes: 34 additions & 0 deletions src/main/java/rs/iggy/message/BytesMessageId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package rs.iggy.message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.math.BigInteger;
import java.util.Arrays;

public class BytesMessageId implements MessageId {

private final byte[] value;

public BytesMessageId(byte[] value) {
if (value.length != 16) {
throw new IllegalArgumentException("Message id must have 16 bytes");
}
this.value = value;
}

@Override
public BigInteger toBigInteger() {
return new BigInteger(1, value);
}

@Override
public ByteBuf toBytes() {
return Unpooled.wrappedBuffer(value);
}

@Override
public String toString() {
return Arrays.toString(value);
}

}
2 changes: 1 addition & 1 deletion src/main/java/rs/iggy/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public record Message(
BigInteger offset,
MessageState state,
BigInteger timestamp,
BigInteger id,
MessageId id,
Long checksum,
Optional<Map<String, HeaderValue>> headers,
byte[] payload
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/rs/iggy/message/MessageId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rs.iggy.message;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import io.netty.buffer.ByteBuf;
import java.math.BigInteger;

public interface MessageId {

@JsonCreator
static MessageId from(BigInteger bigInteger) {
return new BigIntegerMessageId(bigInteger);
}

@JsonValue
BigInteger toBigInteger();

ByteBuf toBytes();

}
3 changes: 1 addition & 2 deletions src/main/java/rs/iggy/message/MessageToSend.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package rs.iggy.message;

import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;

public record MessageToSend(
BigInteger id,
MessageId id,
byte[] payload,
Optional<Map<String, HeaderValue>> headers
) {
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/rs/iggy/message/UuidMessageId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package rs.iggy.message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;

public class UuidMessageId implements MessageId {

private final UUID value;

public UuidMessageId(UUID value) {
this.value = value;
}

@Override
public BigInteger toBigInteger() {
ByteBuffer buffer = ByteBuffer.wrap(new byte[16]);
buffer.putLong(value.getMostSignificantBits());
buffer.putLong(value.getLeastSignificantBits());
return new BigInteger(1, buffer.array());
}

public ByteBuf toBytes() {
var buffer = Unpooled.buffer(16, 16);
buffer.writeLongLE(value.getLeastSignificantBits());
buffer.writeLongLE(value.getMostSignificantBits());
return buffer;
}

@Override
public String toString() {
return value.toString();
}

}
23 changes: 4 additions & 19 deletions src/test/java/rs/iggy/clients/blocking/MessagesClientBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PollingKind;
import rs.iggy.message.PollingStrategy;
import rs.iggy.message.*;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import static java.util.Optional.empty;
Expand All @@ -32,7 +28,7 @@ void shouldSendAndGetMessages() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.partitionId(1L),
List.of(new MessageToSend(getRandomId(), text.getBytes(), empty())));
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand All @@ -49,7 +45,7 @@ void shouldSendMessageWithBalancedPartitioning() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.balanced(),
List.of(new MessageToSend(getRandomId(), text.getBytes(), empty())));
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand All @@ -66,7 +62,7 @@ void shouldSendMessageWithMessageKeyPartitioning() {
// when
String text = "message from java sdk";
messagesClient.sendMessages(42L, 42L, Partitioning.messagesKey("test-key"),
List.of(new MessageToSend(getRandomId(), text.getBytes(), empty())));
List.of(new MessageToSend(new UuidMessageId(UUID.randomUUID()), text.getBytes(), empty())));

var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L, false);
Expand All @@ -75,15 +71,4 @@ void shouldSendMessageWithMessageKeyPartitioning() {
assertThat(polledMessages.messages()).hasSize(1);
}

private static BigInteger getRandomId() {
return new BigInteger(1, uuidToBytes(UUID.randomUUID()));
}

private static byte[] uuidToBytes(UUID uuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.array();
}

}

0 comments on commit c27760f

Please sign in to comment.