From 500b481925ab04a2d5f897673ac9f3c0c491daf6 Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Sat, 12 Oct 2024 11:30:07 +0200 Subject: [PATCH] Implement personal access tokens for TCP --- .../blocking/tcp/BytesDeserializer.java | 24 ++++++- .../clients/blocking/tcp/IggyTcpClient.java | 4 +- .../tcp/PersonalAccessTokensTcpClient.java | 62 +++++++++++++++++++ .../PersonalAccessTokensTcpClientTest.java | 13 ++++ 4 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java create mode 100644 src/test/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClientTest.java diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java index 3a5da1a..1ca9f37 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java @@ -8,6 +8,8 @@ import rs.iggy.consumeroffset.ConsumerOffsetInfo; import rs.iggy.message.*; import rs.iggy.partition.Partition; +import rs.iggy.personalaccesstoken.PersonalAccessTokenInfo; +import rs.iggy.personalaccesstoken.RawPersonalAccessToken; import rs.iggy.stream.StreamBase; import rs.iggy.stream.StreamDetails; import rs.iggy.system.ClientInfo; @@ -297,7 +299,13 @@ static StreamPermissions readStreamPermissions(ByteBuf response) { var topicPermissions = readTopicPermissions(response); topicPermissionsMap.put(topicId, topicPermissions); } - return new StreamPermissions(manageStream, readStream, manageTopics, readTopics, pollMessages, sendMessages, topicPermissionsMap); + return new StreamPermissions(manageStream, + readStream, + manageTopics, + readTopics, + pollMessages, + sendMessages, + topicPermissionsMap); } static TopicPermissions readTopicPermissions(ByteBuf response) { @@ -341,6 +349,20 @@ static UserInfo readUserInfo(ByteBuf response) { return new UserInfo(userId, createdAt, status, username); } + static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response) { + var tokenLength = response.readByte(); + var token = response.readCharSequence(tokenLength, StandardCharsets.UTF_8).toString(); + return new RawPersonalAccessToken(token); + } + + static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf response) { + var nameLength = response.readByte(); + var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); + var expiry = readU64AsBigInteger(response); + Optional expiryOptional = expiry.equals(BigInteger.ZERO) ? Optional.empty() : Optional.of(expiry); + return new PersonalAccessTokenInfo(name, expiryOptional); + } + private static BigInteger readU64AsBigInteger(ByteBuf buffer) { var bytesArray = new byte[9]; buffer.readBytes(bytesArray, 0, 8); diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java index ba1f968..ee4fc0c 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java @@ -12,6 +12,7 @@ public class IggyTcpClient implements IggyClient { private final ConsumerOffsetTcpClient consumerOffsetsClient; private final MessagesTcpClient messagesClient; private final SystemTcpClient systemClient; + private final PersonalAccessTokensTcpClient personalAccessTokensClient; public IggyTcpClient(String host, Integer port) { TcpConnectionHandler connection = new TcpConnectionHandler(host, port); @@ -23,6 +24,7 @@ public IggyTcpClient(String host, Integer port) { consumerOffsetsClient = new ConsumerOffsetTcpClient(connection); messagesClient = new MessagesTcpClient(connection); systemClient = new SystemTcpClient(connection); + personalAccessTokensClient = new PersonalAccessTokensTcpClient(connection); } @Override @@ -67,7 +69,7 @@ public MessagesClient messages() { @Override public PersonalAccessTokensClient personalAccessTokens() { - throw new UnsupportedOperationException(); + return personalAccessTokensClient; } } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java new file mode 100644 index 0000000..ea587d0 --- /dev/null +++ b/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java @@ -0,0 +1,62 @@ +package rs.iggy.clients.blocking.tcp; + +import io.netty.buffer.Unpooled; +import rs.iggy.clients.blocking.PersonalAccessTokensClient; +import rs.iggy.personalaccesstoken.PersonalAccessTokenInfo; +import rs.iggy.personalaccesstoken.RawPersonalAccessToken; +import rs.iggy.user.IdentityInfo; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readPersonalAccessTokenInfo; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readRawPersonalAccessToken; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytesAsU64; + +class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { + + private static final int GET_PERSONAL_ACCESS_TOKENS_CODE = 41; + private static final int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42; + private static final int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43; + private static final int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44; + + private final TcpConnectionHandler connection; + + public PersonalAccessTokensTcpClient(TcpConnectionHandler connection) { + this.connection = connection; + } + + @Override + public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger expiry) { + var payload = Unpooled.buffer(); + payload.writeBytes(nameToBytes(name)); + payload.writeBytes(toBytesAsU64(expiry)); + var response = connection.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload); + return readRawPersonalAccessToken(response); + } + + @Override + public List getPersonalAccessTokens() { + var response = connection.send(GET_PERSONAL_ACCESS_TOKENS_CODE); + var tokens = new ArrayList(); + while (response.isReadable()) { + tokens.add(readPersonalAccessTokenInfo(response)); + } + return tokens; + } + + @Override + public void deletePersonalAccessToken(String name) { + var payload = nameToBytes(name); + connection.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload); + } + + @Override + public IdentityInfo loginWithPersonalAccessToken(String token) { + var payload = nameToBytes(token); + var response = connection.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload); + var userId = response.readUnsignedIntLE(); + return new IdentityInfo(userId, Optional.empty()); + } +} diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClientTest.java new file mode 100644 index 0000000..4bce838 --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClientTest.java @@ -0,0 +1,13 @@ +package rs.iggy.clients.blocking.tcp; + +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.PersonalAccessTokensBaseTest; + +class PersonalAccessTokensTcpClientTest extends PersonalAccessTokensBaseTest { + + @Override + protected IggyClient getClient() { + return TcpClientFactory.create(iggyServer); + } + +}