From 5f8949b6659a4b73b995a7951249476e68c71c7e Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Fri, 11 Oct 2024 21:36:04 +0200 Subject: [PATCH] Implement update method for topic --- .../iggy/clients/blocking/TopicsClient.java | 44 +++++++++++++------ .../blocking/http/TopicsHttpClient.java | 18 ++++++-- .../clients/blocking/tcp/TopicsTcpClient.java | 27 +++++++++--- .../blocking/TopicsClientBaseTest.java | 31 +++++++++++++ 4 files changed, 97 insertions(+), 23 deletions(-) diff --git a/src/main/java/rs/iggy/clients/blocking/TopicsClient.java b/src/main/java/rs/iggy/clients/blocking/TopicsClient.java index fda7b96..ca4a0a3 100644 --- a/src/main/java/rs/iggy/clients/blocking/TopicsClient.java +++ b/src/main/java/rs/iggy/clients/blocking/TopicsClient.java @@ -23,17 +23,42 @@ default List getTopics(Long streamId) { List getTopics(StreamId streamId); - default void createTopic(Long streamId, + default TopicDetails createTopic(Long streamId, + Optional topicId, + Long partitionsCount, + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, + String name) { + return createTopic(StreamId.of(streamId), + topicId, + partitionsCount, + compressionAlgorithm, + messageExpiry, + maxTopicSize, + replicationFactor, + name); + } + + TopicDetails createTopic(StreamId streamId, Optional topicId, Long partitionsCount, CompressionAlgorithm compressionAlgorithm, BigInteger messageExpiry, BigInteger maxTopicSize, Optional replicationFactor, + String name); + + default void updateTopic(Long streamId, + Long topicId, + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, String name) { - createTopic(StreamId.of(streamId), - topicId, - partitionsCount, + updateTopic(StreamId.of(streamId), + TopicId.of(topicId), compressionAlgorithm, messageExpiry, maxTopicSize, @@ -41,21 +66,14 @@ default void createTopic(Long streamId, name); } - TopicDetails createTopic(StreamId streamId, - Optional topicId, - Long partitionsCount, + void updateTopic(StreamId streamId, + TopicId topicId, CompressionAlgorithm compressionAlgorithm, BigInteger messageExpiry, BigInteger maxTopicSize, Optional replicationFactor, String name); - default void updateTopic(Long streamId, Long topicId, Optional messageExpiry, String name) { - updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name); - } - - void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name); - default void deleteTopic(Long streamId, Long topicId) { deleteTopic(StreamId.of(streamId), TopicId.of(topicId)); } diff --git a/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java b/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java index 9871f3f..672a1cd 100644 --- a/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java @@ -56,9 +56,15 @@ public TopicDetails createTopic(StreamId streamId, } @Override - public void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name) { + public void updateTopic(StreamId streamId, + TopicId topicId, + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, + String name) { var request = httpClient.preparePutRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId, - new UpdateTopic(messageExpiry, name)); + new UpdateTopic(compressionAlgorithm, messageExpiry, maxTopicSize, replicationFactor, name)); httpClient.execute(request); } @@ -79,6 +85,12 @@ record CreateTopic( ) { } - record UpdateTopic(Optional messageExpiry, String name) { + record UpdateTopic( + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, + String name + ) { } } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java index 012e718..0eb60ff 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java @@ -13,8 +13,7 @@ import java.util.Optional; import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopic; import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopicDetails; -import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes; -import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.*; class TopicsTcpClient implements TopicsClient { @@ -58,8 +57,8 @@ public TopicDetails createTopic(StreamId streamId, Optional topicId, Long payload.writeIntLE(topicId.orElse(0L).intValue()); payload.writeIntLE(partitionsCount.intValue()); payload.writeByte(compressionAlgorithm.asCode()); - payload.writeBytes(BytesSerializer.toBytesAsU64(messageExpiry)); - payload.writeBytes(BytesSerializer.toBytesAsU64(maxTopicSize)); + payload.writeBytes(toBytesAsU64(messageExpiry)); + payload.writeBytes(toBytesAsU64(maxTopicSize)); payload.writeByte(replicationFactor.orElse((short) 0)); payload.writeBytes(nameToBytes(name)); @@ -67,10 +66,24 @@ public TopicDetails createTopic(StreamId streamId, Optional topicId, Long return readTopicDetails(response); } - @Override - public void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name) { - throw new UnsupportedOperationException(); + public void updateTopic(StreamId streamId, + TopicId topicId, + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, + String name) { + var payload = Unpooled.buffer(); + payload.writeBytes(toBytes(streamId)); + payload.writeBytes(toBytes(topicId)); + payload.writeByte(compressionAlgorithm.asCode()); + payload.writeBytes(toBytesAsU64(messageExpiry)); + payload.writeBytes(toBytesAsU64(maxTopicSize)); + payload.writeByte(replicationFactor.orElse((short) 0)); + payload.writeBytes(nameToBytes(name)); + + connection.send(UPDATE_TOPIC_CODE, payload); } @Override diff --git a/src/test/java/rs/iggy/clients/blocking/TopicsClientBaseTest.java b/src/test/java/rs/iggy/clients/blocking/TopicsClientBaseTest.java index c1acbec..aaf8551 100644 --- a/src/test/java/rs/iggy/clients/blocking/TopicsClientBaseTest.java +++ b/src/test/java/rs/iggy/clients/blocking/TopicsClientBaseTest.java @@ -2,7 +2,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import rs.iggy.identifier.StreamId; +import rs.iggy.identifier.TopicId; import rs.iggy.topic.CompressionAlgorithm; +import rs.iggy.topic.TopicDetails; import java.math.BigInteger; import static java.util.Optional.empty; import static java.util.Optional.of; @@ -45,4 +48,32 @@ void shouldCreateAndDeleteTopic() { assertThat(topicsClient.getTopics(42L)).isEmpty(); } + @Test + void shouldUpdateTopic() { + // given + var topic = topicsClient.createTopic(42L, + of(42L), + 1L, + CompressionAlgorithm.none, + BigInteger.ZERO, + BigInteger.ZERO, + empty(), + "test-topic"); + + + // when + topicsClient.updateTopic(StreamId.of(42L), + TopicId.of(topic.id()), + CompressionAlgorithm.none, + BigInteger.valueOf(5000), + BigInteger.ZERO, + empty(), + "new-name"); + + // then + TopicDetails updatedTopic = topicsClient.getTopic(42L, 42L); + assertThat(updatedTopic.name()).isEqualTo("new-name"); + assertThat(updatedTopic.messageExpiry()).isEqualTo(BigInteger.valueOf(5000)); + } + }