Skip to content

Commit

Permalink
Implement update method for topic
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 11, 2024
1 parent 5d8e483 commit 5f8949b
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 23 deletions.
44 changes: 31 additions & 13 deletions src/main/java/rs/iggy/clients/blocking/TopicsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,57 @@ default List<Topic> getTopics(Long streamId) {

List<Topic> getTopics(StreamId streamId);

default void createTopic(Long streamId,
default TopicDetails createTopic(Long streamId,
Optional<Long> topicId,
Long partitionsCount,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
return createTopic(StreamId.of(streamId),
topicId,
partitionsCount,
compressionAlgorithm,
messageExpiry,
maxTopicSize,
replicationFactor,
name);
}

TopicDetails createTopic(StreamId streamId,
Optional<Long> topicId,
Long partitionsCount,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name);

default void updateTopic(Long streamId,
Long topicId,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
createTopic(StreamId.of(streamId),
topicId,
partitionsCount,
updateTopic(StreamId.of(streamId),
TopicId.of(topicId),
compressionAlgorithm,
messageExpiry,
maxTopicSize,
replicationFactor,
name);
}

TopicDetails createTopic(StreamId streamId,
Optional<Long> topicId,
Long partitionsCount,
void updateTopic(StreamId streamId,
TopicId topicId,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name);

default void updateTopic(Long streamId, Long topicId, Optional<Long> messageExpiry, String name) {
updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name);
}

void updateTopic(StreamId streamId, TopicId topicId, Optional<Long> messageExpiry, String name);

default void deleteTopic(Long streamId, Long topicId) {
deleteTopic(StreamId.of(streamId), TopicId.of(topicId));
}
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ public TopicDetails createTopic(StreamId streamId,
}

@Override
public void updateTopic(StreamId streamId, TopicId topicId, Optional<Long> messageExpiry, String name) {
public void updateTopic(StreamId streamId,
TopicId topicId,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
var request = httpClient.preparePutRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId,
new UpdateTopic(messageExpiry, name));
new UpdateTopic(compressionAlgorithm, messageExpiry, maxTopicSize, replicationFactor, name));
httpClient.execute(request);
}

Expand All @@ -79,6 +85,12 @@ record CreateTopic(
) {
}

record UpdateTopic(Optional<Long> messageExpiry, String name) {
record UpdateTopic(
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name
) {
}
}
27 changes: 20 additions & 7 deletions src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import java.util.Optional;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopic;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopicDetails;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.*;

class TopicsTcpClient implements TopicsClient {

Expand Down Expand Up @@ -58,19 +57,33 @@ public TopicDetails createTopic(StreamId streamId, Optional<Long> topicId, Long
payload.writeIntLE(topicId.orElse(0L).intValue());
payload.writeIntLE(partitionsCount.intValue());
payload.writeByte(compressionAlgorithm.asCode());
payload.writeBytes(BytesSerializer.toBytesAsU64(messageExpiry));
payload.writeBytes(BytesSerializer.toBytesAsU64(maxTopicSize));
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
payload.writeBytes(nameToBytes(name));

var response = connection.send(CREATE_TOPIC_CODE, payload);
return readTopicDetails(response);
}


@Override
public void updateTopic(StreamId streamId, TopicId topicId, Optional<Long> messageExpiry, String name) {
throw new UnsupportedOperationException();
public void updateTopic(StreamId streamId,
TopicId topicId,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
var payload = Unpooled.buffer();
payload.writeBytes(toBytes(streamId));
payload.writeBytes(toBytes(topicId));
payload.writeByte(compressionAlgorithm.asCode());
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
payload.writeBytes(nameToBytes(name));

connection.send(UPDATE_TOPIC_CODE, payload);
}

@Override
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/rs/iggy/clients/blocking/TopicsClientBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.topic.CompressionAlgorithm;
import rs.iggy.topic.TopicDetails;
import java.math.BigInteger;
import static java.util.Optional.empty;
import static java.util.Optional.of;
Expand Down Expand Up @@ -45,4 +48,32 @@ void shouldCreateAndDeleteTopic() {
assertThat(topicsClient.getTopics(42L)).isEmpty();
}

@Test
void shouldUpdateTopic() {
// given
var topic = topicsClient.createTopic(42L,
of(42L),
1L,
CompressionAlgorithm.none,
BigInteger.ZERO,
BigInteger.ZERO,
empty(),
"test-topic");


// when
topicsClient.updateTopic(StreamId.of(42L),
TopicId.of(topic.id()),
CompressionAlgorithm.none,
BigInteger.valueOf(5000),
BigInteger.ZERO,
empty(),
"new-name");

// then
TopicDetails updatedTopic = topicsClient.getTopic(42L, 42L);
assertThat(updatedTopic.name()).isEqualTo("new-name");
assertThat(updatedTopic.messageExpiry()).isEqualTo(BigInteger.valueOf(5000));
}

}

0 comments on commit 5f8949b

Please sign in to comment.