Skip to content

Commit

Permalink
Implement personal access tokens for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 12, 2024
1 parent 6288c4e commit 500b481
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import rs.iggy.consumeroffset.ConsumerOffsetInfo;
import rs.iggy.message.*;
import rs.iggy.partition.Partition;
import rs.iggy.personalaccesstoken.PersonalAccessTokenInfo;
import rs.iggy.personalaccesstoken.RawPersonalAccessToken;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
import rs.iggy.system.ClientInfo;
Expand Down Expand Up @@ -297,7 +299,13 @@ static StreamPermissions readStreamPermissions(ByteBuf response) {
var topicPermissions = readTopicPermissions(response);
topicPermissionsMap.put(topicId, topicPermissions);
}
return new StreamPermissions(manageStream, readStream, manageTopics, readTopics, pollMessages, sendMessages, topicPermissionsMap);
return new StreamPermissions(manageStream,
readStream,
manageTopics,
readTopics,
pollMessages,
sendMessages,
topicPermissionsMap);
}

static TopicPermissions readTopicPermissions(ByteBuf response) {
Expand Down Expand Up @@ -341,6 +349,20 @@ static UserInfo readUserInfo(ByteBuf response) {
return new UserInfo(userId, createdAt, status, username);
}

static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response) {
var tokenLength = response.readByte();
var token = response.readCharSequence(tokenLength, StandardCharsets.UTF_8).toString();
return new RawPersonalAccessToken(token);
}

static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf response) {
var nameLength = response.readByte();
var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString();
var expiry = readU64AsBigInteger(response);
Optional<BigInteger> expiryOptional = expiry.equals(BigInteger.ZERO) ? Optional.empty() : Optional.of(expiry);
return new PersonalAccessTokenInfo(name, expiryOptional);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class IggyTcpClient implements IggyClient {
private final ConsumerOffsetTcpClient consumerOffsetsClient;
private final MessagesTcpClient messagesClient;
private final SystemTcpClient systemClient;
private final PersonalAccessTokensTcpClient personalAccessTokensClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
Expand All @@ -23,6 +24,7 @@ public IggyTcpClient(String host, Integer port) {
consumerOffsetsClient = new ConsumerOffsetTcpClient(connection);
messagesClient = new MessagesTcpClient(connection);
systemClient = new SystemTcpClient(connection);
personalAccessTokensClient = new PersonalAccessTokensTcpClient(connection);
}

@Override
Expand Down Expand Up @@ -67,7 +69,7 @@ public MessagesClient messages() {

@Override
public PersonalAccessTokensClient personalAccessTokens() {
throw new UnsupportedOperationException();
return personalAccessTokensClient;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.Unpooled;
import rs.iggy.clients.blocking.PersonalAccessTokensClient;
import rs.iggy.personalaccesstoken.PersonalAccessTokenInfo;
import rs.iggy.personalaccesstoken.RawPersonalAccessToken;
import rs.iggy.user.IdentityInfo;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readPersonalAccessTokenInfo;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readRawPersonalAccessToken;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytesAsU64;

class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient {

private static final int GET_PERSONAL_ACCESS_TOKENS_CODE = 41;
private static final int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42;
private static final int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
private static final int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;

private final TcpConnectionHandler connection;

public PersonalAccessTokensTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

@Override
public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger expiry) {
var payload = Unpooled.buffer();
payload.writeBytes(nameToBytes(name));
payload.writeBytes(toBytesAsU64(expiry));
var response = connection.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload);
return readRawPersonalAccessToken(response);
}

@Override
public List<PersonalAccessTokenInfo> getPersonalAccessTokens() {
var response = connection.send(GET_PERSONAL_ACCESS_TOKENS_CODE);
var tokens = new ArrayList<PersonalAccessTokenInfo>();
while (response.isReadable()) {
tokens.add(readPersonalAccessTokenInfo(response));
}
return tokens;
}

@Override
public void deletePersonalAccessToken(String name) {
var payload = nameToBytes(name);
connection.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload);
}

@Override
public IdentityInfo loginWithPersonalAccessToken(String token) {
var payload = nameToBytes(token);
var response = connection.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload);
var userId = response.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rs.iggy.clients.blocking.tcp;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.PersonalAccessTokensBaseTest;

class PersonalAccessTokensTcpClientTest extends PersonalAccessTokensBaseTest {

@Override
protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

}

0 comments on commit 500b481

Please sign in to comment.