Skip to content

Commit 922d458

Browse files
committed
[server] Support list rebalance processes
1 parent b5d9f76 commit 922d458

File tree

8 files changed

+196
-20
lines changed

8 files changed

+196
-20
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
7373
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7474
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
75+
import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest;
7576
import org.apache.fluss.rpc.messages.ListTablesRequest;
7677
import org.apache.fluss.rpc.messages.ListTablesResponse;
7778
import org.apache.fluss.rpc.messages.PbAlterConfig;
@@ -568,7 +569,8 @@ public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
568569

569570
@Override
570571
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
571-
throw new UnsupportedOperationException("Support soon");
572+
return gateway.listRebalanceProcess(new ListRebalanceProcessRequest())
573+
.thenApply(ClientRpcMessageUtils::toRebalanceProcess);
572574
}
573575

574576
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
2828
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
30+
import org.apache.fluss.cluster.rebalance.RebalanceStatusForBucket;
2931
import org.apache.fluss.config.cluster.AlterConfigOpType;
3032
import org.apache.fluss.config.cluster.ConfigEntry;
3133
import org.apache.fluss.fs.FsPath;
@@ -45,6 +47,7 @@
4547
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4648
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
4749
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
50+
import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse;
4851
import org.apache.fluss.rpc.messages.LookupRequest;
4952
import org.apache.fluss.rpc.messages.MetadataRequest;
5053
import org.apache.fluss.rpc.messages.PbAlterConfig;
@@ -60,6 +63,9 @@
6063
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
6164
import org.apache.fluss.rpc.messages.PbRebalancePlanForPartition;
6265
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
66+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForBucket;
67+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForPartition;
68+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForTable;
6369
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6470
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6571
import org.apache.fluss.rpc.messages.ProduceLogRequest;
@@ -375,6 +381,57 @@ private static RebalancePlanForBucket toRebalancePlanForBucket(
375381
Arrays.stream(pbBucket.getNewReplicas()).boxed().collect(Collectors.toList()));
376382
}
377383

384+
public static Map<TableBucket, RebalanceResultForBucket> toRebalanceProcess(
385+
ListRebalanceProcessResponse response) {
386+
Map<TableBucket, RebalanceResultForBucket> rebalanceProcess = new HashMap<>();
387+
388+
for (PbRebalanceProcessForTable pbRebalanceProcessForTable :
389+
response.getProcessForTablesList()) {
390+
long tableId = pbRebalanceProcessForTable.getTableId();
391+
392+
for (PbRebalanceProcessForPartition pbRebalanceProcessForPartition :
393+
pbRebalanceProcessForTable.getPartitionsProcessesList()) {
394+
long partitionId = pbRebalanceProcessForPartition.getPartitionId();
395+
396+
for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
397+
pbRebalanceProcessForPartition.getBucketsProcessesList()) {
398+
int bucketId = pbRebalanceProcessForBucket.getBucketId();
399+
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
400+
rebalanceProcess.put(
401+
tableBucket,
402+
toRebalanceResultForBucket(tableBucket, pbRebalanceProcessForBucket));
403+
}
404+
}
405+
406+
for (PbRebalanceProcessForBucket pbRebalanceProcessForBucket :
407+
pbRebalanceProcessForTable.getBucketsProcessesList()) {
408+
int bucketId = pbRebalanceProcessForBucket.getBucketId();
409+
TableBucket tableBucket = new TableBucket(tableId, null, bucketId);
410+
rebalanceProcess.put(
411+
tableBucket,
412+
toRebalanceResultForBucket(tableBucket, pbRebalanceProcessForBucket));
413+
}
414+
}
415+
416+
return rebalanceProcess;
417+
}
418+
419+
private static RebalanceResultForBucket toRebalanceResultForBucket(
420+
TableBucket tableBucket, PbRebalanceProcessForBucket pbRebalanceProcessForBucket) {
421+
return RebalanceResultForBucket.of(
422+
new RebalancePlanForBucket(
423+
tableBucket,
424+
pbRebalanceProcessForBucket.getOriginalReplicas()[0],
425+
pbRebalanceProcessForBucket.getNewReplicas()[0],
426+
Arrays.stream(pbRebalanceProcessForBucket.getOriginalReplicas())
427+
.boxed()
428+
.collect(Collectors.toList()),
429+
Arrays.stream(pbRebalanceProcessForBucket.getNewReplicas())
430+
.boxed()
431+
.collect(Collectors.toList())),
432+
RebalanceStatusForBucket.of(pbRebalanceProcessForBucket.getRebalanceStatus()));
433+
}
434+
378435
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
379436
return response.getPartitionsInfosList().stream()
380437
.map(

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import org.apache.fluss.client.Connection;
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
24+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2325
import org.apache.fluss.cluster.rebalance.ServerTag;
2426
import org.apache.fluss.config.ConfigOptions;
2527
import org.apache.fluss.config.Configuration;
2628
import org.apache.fluss.metadata.DatabaseDescriptor;
2729
import org.apache.fluss.metadata.PartitionSpec;
30+
import org.apache.fluss.metadata.TableBucket;
2831
import org.apache.fluss.metadata.TableDescriptor;
2932
import org.apache.fluss.metadata.TablePath;
3033
import org.apache.fluss.server.replica.ReplicaManager;
@@ -38,6 +41,8 @@
3841
import java.time.Duration;
3942
import java.util.Arrays;
4043
import java.util.Collections;
44+
import java.util.Map;
45+
import java.util.stream.Collectors;
4146

4247
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
4348
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -130,19 +135,25 @@ void testRebalanceForLogTable() throws Exception {
130135
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
131136

132137
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
133-
admin.rebalance(
134-
Arrays.asList(
135-
GoalType.REPLICA_DISTRIBUTION_GOAL,
136-
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
137-
false)
138-
.get();
138+
Map<TableBucket, RebalancePlanForBucket> rebalancePlan =
139+
admin.rebalance(
140+
Arrays.asList(
141+
GoalType.REPLICA_DISTRIBUTION_GOAL,
142+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
143+
false)
144+
.get();
145+
146+
// verify list rebalance process
147+
Map<TableBucket, RebalanceResultForBucket> rebalanceProcess =
148+
admin.listRebalanceProcess().get();
149+
Map<TableBucket, RebalancePlanForBucket> rebalanceProcessPlan =
150+
getRebalancePlanFromResult(rebalanceProcess);
151+
assertThat(rebalancePlan).containsExactlyEntriesOf(rebalanceProcessPlan);
139152

140153
retry(
141154
Duration.ofMinutes(2),
142155
() -> {
143-
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
144-
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
145-
.isNotPresent();
156+
assertThat(admin.listRebalanceProcess().get()).isEmpty();
146157
for (int i = 0; i < 4; i++) {
147158
ReplicaManager replicaManager =
148159
FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager();
@@ -157,18 +168,20 @@ void testRebalanceForLogTable() throws Exception {
157168
// add server tag PERMANENT_OFFLINE for server 3, trigger all leader and replica removed
158169
// from server 3.
159170
admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get();
160-
admin.rebalance(
161-
Arrays.asList(
162-
GoalType.REPLICA_DISTRIBUTION_GOAL,
163-
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
164-
false)
165-
.get();
171+
rebalancePlan =
172+
admin.rebalance(
173+
Arrays.asList(
174+
GoalType.REPLICA_DISTRIBUTION_GOAL,
175+
GoalType.LEADER_REPLICA_DISTRIBUTION_GOAL),
176+
false)
177+
.get();
178+
rebalanceProcess = admin.listRebalanceProcess().get();
179+
rebalanceProcessPlan = getRebalancePlanFromResult(rebalanceProcess);
180+
assertThat(rebalancePlan).containsExactlyEntriesOf(rebalanceProcessPlan);
166181
retry(
167182
Duration.ofMinutes(2),
168183
() -> {
169-
// TODO use admin#listRebalanceProcess to verify rebalance is finished.
170-
assertThat(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().getRebalancePlan())
171-
.isNotPresent();
184+
assertThat(admin.listRebalanceProcess().get()).isEmpty();
172185
assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0);
173186
assertThat(replicaManager3.leaderCount()).isEqualTo(0);
174187
for (int i = 0; i < 3; i++) {
@@ -199,4 +212,10 @@ private long createTable(
199212
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
200213
return admin.getTableInfo(tablePath).get().getTableId();
201214
}
215+
216+
private Map<TableBucket, RebalancePlanForBucket> getRebalancePlanFromResult(
217+
Map<TableBucket, RebalanceResultForBucket> rebalanceProcess) {
218+
return rebalanceProcess.entrySet().stream()
219+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().planForBucket()));
220+
}
202221
}

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public List<Integer> newReplicas() {
5151
return rebalancePlanForBucket.getNewReplicas();
5252
}
5353

54+
public List<Integer> originReplicas() {
55+
return rebalancePlanForBucket.getOriginReplicas();
56+
}
57+
5458
public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) {
5559
this.rebalanceStatusForBucket = status;
5660
return this;

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public enum RebalanceStatusForBucket {
3737
this.code = code;
3838
}
3939

40+
public int getCode() {
41+
return code;
42+
}
43+
4044
public static RebalanceStatusForBucket of(int code) {
4145
for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) {
4246
if (status.code == code) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec;
156156
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse;
157157
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
158+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListRebalanceProcessResponse;
158159
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeRebalanceRespose;
159160
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableChanges;
160161
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
@@ -820,7 +821,15 @@ public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request)
820821
@Override
821822
public CompletableFuture<ListRebalanceProcessResponse> listRebalanceProcess(
822823
ListRebalanceProcessRequest request) {
823-
throw new UnsupportedOperationException("Support soon!");
824+
if (authorizer != null) {
825+
authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster());
826+
}
827+
828+
RebalanceManager rebalanceManager = rebalanceManagerSupplier.get();
829+
return CompletableFuture.completedFuture(
830+
makeListRebalanceProcessResponse(
831+
rebalanceManager.getOngoingRebalanceTasks(),
832+
rebalanceManager.getFinishedRebalanceTasks()));
824833
}
825834

826835
@Override

fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ public void startup() {
100100
registerRebalanceFromZookeeper();
101101
}
102102

103+
public Map<TableBucket, RebalanceResultForBucket> getOngoingRebalanceTasks() {
104+
checkNotClosed();
105+
return inLock(lock, () -> ongoingRebalanceTasks);
106+
}
107+
108+
public Map<TableBucket, RebalanceResultForBucket> getFinishedRebalanceTasks() {
109+
checkNotClosed();
110+
return inLock(lock, () -> finishedRebalanceTasks);
111+
}
112+
103113
private void registerRebalanceFromZookeeper() {
104114
try {
105115
zkClient.getRebalancePlan()

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.cluster.ServerNode;
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
24+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.config.cluster.AlterConfigOpType;
2627
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -72,6 +73,7 @@
7273
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7374
import org.apache.fluss.rpc.messages.ListOffsetsResponse;
7475
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
76+
import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse;
7577
import org.apache.fluss.rpc.messages.LookupRequest;
7678
import org.apache.fluss.rpc.messages.LookupResponse;
7779
import org.apache.fluss.rpc.messages.MetadataResponse;
@@ -117,6 +119,8 @@
117119
import org.apache.fluss.rpc.messages.PbPutKvRespForBucket;
118120
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
119121
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
122+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForBucket;
123+
import org.apache.fluss.rpc.messages.PbRebalanceProcessForTable;
120124
import org.apache.fluss.rpc.messages.PbRemoteLogSegment;
121125
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
122126
import org.apache.fluss.rpc.messages.PbServerNode;
@@ -182,6 +186,7 @@
182186
import java.util.Optional;
183187
import java.util.OptionalInt;
184188
import java.util.Set;
189+
import java.util.function.BiConsumer;
185190
import java.util.stream.Collectors;
186191
import java.util.stream.Stream;
187192

@@ -1768,6 +1773,72 @@ private static PbRebalancePlanForBucket toPbRebalancePlanForBucket(
17681773
return pbRebalancePlanForBucket;
17691774
}
17701775

1776+
public static ListRebalanceProcessResponse makeListRebalanceProcessResponse(
1777+
Map<TableBucket, RebalanceResultForBucket> ongoingRebalanceTasks,
1778+
Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks) {
1779+
ListRebalanceProcessResponse response = new ListRebalanceProcessResponse();
1780+
1781+
Map<Long, List<PbRebalanceProcessForBucket>> processForTables = new HashMap<>();
1782+
Map<Long, Map<Long, List<PbRebalanceProcessForBucket>>> processForPartitions =
1783+
new HashMap<>();
1784+
1785+
BiConsumer<TableBucket, RebalanceResultForBucket> collectProcessResult =
1786+
(tableBucket, rebalanceResultForBucket) -> {
1787+
if (tableBucket.getPartitionId() == null) {
1788+
processForTables
1789+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
1790+
.add(
1791+
toPbRebalanceProcessForBucket(
1792+
tableBucket, rebalanceResultForBucket));
1793+
} else {
1794+
processForPartitions
1795+
.computeIfAbsent(tableBucket.getTableId(), k -> new HashMap<>())
1796+
.computeIfAbsent(
1797+
tableBucket.getPartitionId(), k -> new ArrayList<>())
1798+
.add(
1799+
toPbRebalanceProcessForBucket(
1800+
tableBucket, rebalanceResultForBucket));
1801+
}
1802+
};
1803+
1804+
ongoingRebalanceTasks.forEach(collectProcessResult);
1805+
finishedRebalanceTasks.forEach(collectProcessResult);
1806+
1807+
processForTables.forEach(
1808+
(tableId, processForBuckets) ->
1809+
response.addProcessForTable()
1810+
.setTableId(tableId)
1811+
.addAllBucketsProcesses(processForBuckets));
1812+
processForPartitions.forEach(
1813+
(tableId, processForPartition) -> {
1814+
PbRebalanceProcessForTable processForTable =
1815+
response.addProcessForTable().setTableId(tableId);
1816+
processForPartition.forEach(
1817+
(partitionId, processForBuckets) ->
1818+
processForTable
1819+
.addPartitionsProcess()
1820+
.setPartitionId(partitionId)
1821+
.addAllBucketsProcesses(processForBuckets));
1822+
});
1823+
1824+
return response;
1825+
}
1826+
1827+
private static PbRebalanceProcessForBucket toPbRebalanceProcessForBucket(
1828+
TableBucket tableBucket, RebalanceResultForBucket rebalanceResultForBucket) {
1829+
return new PbRebalanceProcessForBucket()
1830+
.setBucketId(tableBucket.getBucket())
1831+
.setOriginalReplicas(
1832+
rebalanceResultForBucket.originReplicas().stream()
1833+
.mapToInt(Integer::intValue)
1834+
.toArray())
1835+
.setNewReplicas(
1836+
rebalanceResultForBucket.newReplicas().stream()
1837+
.mapToInt(Integer::intValue)
1838+
.toArray())
1839+
.setRebalanceStatus(rebalanceResultForBucket.status().getCode());
1840+
}
1841+
17711842
private static <T> Map<TableBucket, T> mergeResponse(
17721843
Map<TableBucket, T> response, Map<TableBucket, T> errors) {
17731844
if (errors.isEmpty()) {

0 commit comments

Comments
 (0)