Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
Expand All @@ -35,10 +40,15 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NoRebalanceInProgressException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand All @@ -60,6 +70,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);

/**
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
*
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
* overWriteIfExists} is false.
* </ul>
*
* @param tabletServers the tabletServers we want to add server tags.
* @param serverTag the server tag to be added.
*/
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Remove server tag from the specified tabletServers.
*
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagNotExistException} If the server tag does not exist when {@code
* overWriteIfExists} is false.
* </ul>
*
* @param tabletServers the tabletServers we want to remove server tags.
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
*
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
* balancing according to the user-defined {@code priorityGoals}.
*
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
* execution.
* </ul>
*
* @param priorityGoals the goals to be optimized.
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
* it.
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
*/
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun);

/**
* List the rebalance process.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*
* @return the rebalance process for all the tableBuckets doing rebalance.
*/
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();

/**
* Cannel the rebalance task.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*/
CompletableFuture<Void> cancelRebalance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.LeaderNotAvailableException;
Expand All @@ -44,8 +48,10 @@
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
import org.apache.fluss.rpc.messages.CreateTableRequest;
Expand All @@ -66,12 +72,15 @@
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.ListTablesResponse;
import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.messages.RebalanceRequest;
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.protocol.ApiError;
Expand Down Expand Up @@ -535,6 +544,41 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
return future;
}

@Override
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.addServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> removeServerTag(
List<Integer> tabletServers, ServerTag serverTag) {
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.removeServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun) {
RebalanceRequest request = new RebalanceRequest().setDryRun(dryRun);
priorityGoals.forEach(goal -> request.addGoal(goal.value));
return gateway.rebalance(request).thenApply(ClientRpcMessageUtils::toRebalancePlan);
}

@Override
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
return gateway.listRebalanceProcess(new ListRebalanceProcessRequest())
.thenApply(ClientRpcMessageUtils::toRebalanceProcess);
}

@Override
public CompletableFuture<Void> cancelRebalance() {
CancelRebalanceRequest request = new CancelRebalanceRequest();
return gateway.cancelRebalance(request).thenApply(r -> null);
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.write.KvWriteBatch;
import org.apache.fluss.client.write.ReadyWriteBatch;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceStatusForBucket;
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.fs.FsPath;
Expand All @@ -44,6 +47,7 @@
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.PbAlterConfig;
Expand All @@ -56,10 +60,17 @@
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
import org.apache.fluss.rpc.messages.PbRebalanceProcessForBucket;
import org.apache.fluss.rpc.messages.PbRebalanceProcessForPartition;
import org.apache.fluss.rpc.messages.PbRebalanceProcessForTable;
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.RebalanceResponse;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -328,6 +339,99 @@ public static DropPartitionRequest makeDropPartitionRequest(
return dropPartitionRequest;
}

public static Map<TableBucket, RebalancePlanForBucket> toRebalancePlan(
RebalanceResponse response) {
Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>();
for (PbRebalancePlanForTable pbTable : response.getPlanForTablesList()) {
long tableId = pbTable.getTableId();
if (pbTable.getPartitionsPlansCount() == 0) {
// none-partition table.
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
int bucketId = pbBucket.getBucketId();
rebalancePlan.put(
new TableBucket(tableId, null, bucketId),
toRebalancePlanForBucket(tableId, null, bucketId, pbBucket));
}
} else {
// partition table.
for (PbRebalancePlanForPartition pbPartition : pbTable.getPartitionsPlansList()) {
long partitionId = pbPartition.getPartitionId();
for (PbRebalancePlanForBucket pbBucket : pbPartition.getBucketsPlansList()) {
int bucketId = pbBucket.getBucketId();
rebalancePlan.put(
new TableBucket(tableId, partitionId, bucketId),
toRebalancePlanForBucket(tableId, partitionId, bucketId, pbBucket));
}
}
}
}
return rebalancePlan;
}

private static RebalancePlanForBucket toRebalancePlanForBucket(
long tableId,
@Nullable Long partitionId,
int bucketId,
PbRebalancePlanForBucket pbBucket) {
return new RebalancePlanForBucket(
new TableBucket(tableId, partitionId, bucketId),
pbBucket.getOriginalLeader(),
pbBucket.getNewLeader(),
Arrays.stream(pbBucket.getOriginalReplicas()).boxed().collect(Collectors.toList()),
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
}

public static Map<TableBucket, RebalanceResultForBucket> toRebalanceProcess(
ListRebalanceProcessResponse response) {
Map<TableBucket, RebalanceResultForBucket> rebalanceProcess = new HashMap<>();

for (PbRebalanceProcessForTable pbRebalanceProcessForTable :
response.getProcessForTablesList()) {
long tableId = pbRebalanceProcessForTable.getTableId();

for (PbRebalanceProcessForPartition pbRebalanceProcessForPartition :
pbRebalanceProcessForTable.getPartitionsProcessesList()) {
long partitionId = pbRebalanceProcessForPartition.getPartitionId();

for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
pbRebalanceProcessForPartition.getBucketsProcessesList()) {
int bucketId = pbRebalanceProcessForBucket.getBucketId();
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
rebalanceProcess.put(
tableBucket,
toRebalanceResultForBucket(tableBucket, pbRebalanceProcessForBucket));
}
}

for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
pbRebalanceProcessForTable.getBucketsProcessesList()) {
int bucketId = pbRebalanceProcessForBucket.getBucketId();
TableBucket tableBucket = new TableBucket(tableId, null, bucketId);
rebalanceProcess.put(
tableBucket,
toRebalanceResultForBucket(tableBucket, pbRebalanceProcessForBucket));
}
}

return rebalanceProcess;
}

private static RebalanceResultForBucket toRebalanceResultForBucket(
TableBucket tableBucket, PbRebalanceProcessForBucket pbRebalanceProcessForBucket) {
return RebalanceResultForBucket.of(
new RebalancePlanForBucket(
tableBucket,
pbRebalanceProcessForBucket.getOriginalReplicas()[0],
pbRebalanceProcessForBucket.getNewReplicas()[0],
Arrays.stream(pbRebalanceProcessForBucket.getOriginalReplicas())
.boxed()
.collect(Collectors.toList()),
Arrays.stream(pbRebalanceProcessForBucket.getNewReplicas())
.boxed()
.collect(Collectors.toList())),
RebalanceStatusForBucket.of(pbRebalanceProcessForBucket.getRebalanceStatus()));
}

public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
return response.getPartitionsInfosList().stream()
.map(
Expand Down
Loading
Loading