Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources (
Browse files Browse the repository at this point in the history
apache#21353)

### Motivation

See pip: apache#21129

### Modifications

Add  `BrokerTimeAverageDataResources`
  • Loading branch information
AnonHxy authored Oct 23, 2023
1 parent 0e9bb8a commit 30d59e3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;

@Getter
public class LoadBalanceResources {
public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
public static final String BROKER_TIME_AVERAGE_BASE_PATH = "/loadbalance/broker-time-average";

private final BundleDataResources bundleDataResources;
private final BrokerTimeAverageDataResources brokerTimeAverageDataResources;

public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
bundleDataResources = new BundleDataResources(store, operationTimeoutSec);
brokerTimeAverageDataResources = new BrokerTimeAverageDataResources(store, operationTimeoutSec);
}

public static class BundleDataResources extends BaseResources<BundleData> {
Expand Down Expand Up @@ -69,4 +73,23 @@ private String getBundleDataPath(final String bundle) {
return BUNDLE_DATA_BASE_PATH + "/" + bundle;
}
}

public static class BrokerTimeAverageDataResources extends BaseResources<TimeAverageBrokerData> {
public BrokerTimeAverageDataResources(MetadataStore store, int operationTimeoutSec) {
super(store, TimeAverageBrokerData.class, operationTimeoutSec);
}

public CompletableFuture<Void> updateTimeAverageBrokerData(String brokerLookupAddress,
TimeAverageBrokerData data) {
return setWithCreateAsync(getTimeAverageBrokerDataPath(brokerLookupAddress), __ -> data);
}

public CompletableFuture<Void> deleteTimeAverageBrokerData(String brokerLookupAddress) {
return deleteAsync(getTimeAverageBrokerDataPath(brokerLookupAddress));
}

private String getTimeAverageBrokerDataPath(final String brokerLookupAddress) {
return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Path to ZNode whose children contain ResourceQuota jsons.
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";

// Path to ZNode containing TimeAverageBrokerData jsons for each broker.
public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";

// Set of broker candidates to reuse so that object creation is avoided.
private final Set<String> brokerCandidateCache;

Expand All @@ -119,7 +116,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private ResourceLock<LocalBrokerData> brokerDataLock;

private MetadataCache<ResourceQuota> resourceQuotaCache;
private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;

// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;
Expand Down Expand Up @@ -245,7 +241,6 @@ public void initialize(final PulsarService pulsar) {
this.pulsarResources = pulsar.getPulsarResources();
brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);

Expand Down Expand Up @@ -991,13 +986,13 @@ public void start() throws PulsarServerException {

String lookupServiceAddress = pulsar.getLookupServiceAddress();
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
updateLocalBrokerData();

brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();

timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
__ -> new TimeAverageBrokerData()).join();
pulsarResources.getLoadBalanceResources()
.getBrokerTimeAverageDataResources()
.updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData())
.join();
updateAll();
} catch (Exception e) {
log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e);
Expand Down Expand Up @@ -1154,9 +1149,8 @@ public void writeBundleDataOnZooKeeper() {
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
final String broker = entry.getKey();
final TimeAverageBrokerData data = entry.getValue().getTimeAverageData();
futures.add(timeAverageBrokerDataCache.readModifyUpdateOrCreate(
TIME_AVERAGE_BROKER_ZPATH + "/" + broker, __ -> data)
.thenApply(__ -> null));
futures.add(pulsarResources.getLoadBalanceResources()
.getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, data));
}

try {
Expand All @@ -1177,13 +1171,13 @@ private void deleteBundleDataFromMetadataStore(String bundle) {
}

private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, ex) -> {
if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
log.warn("Failed to delete dead broker {} time "
+ "average data from metadata store", broker, ex);
}
});
pulsarResources.getLoadBalanceResources()
.getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__, ex) -> {
if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) {
log.warn("Failed to delete dead broker {} time "
+ "average data from metadata store", broker, ex);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;

import static java.lang.Thread.sleep;
import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -778,7 +778,7 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception {

List<String> data = pulsar1.getLocalMetadataStore()
.getMetadataCache(TimeAverageBrokerData.class)
.getChildren(TIME_AVERAGE_BROKER_ZPATH)
.getChildren(BROKER_TIME_AVERAGE_BASE_PATH)
.join();

Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;

import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
Expand All @@ -34,7 +35,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
Expand Down Expand Up @@ -172,7 +172,7 @@ private void printGlobalData() {
final LocalBrokerData localData = (LocalBrokerData) data;
numBundles = localData.getNumBundles();
messageRate = localData.getMsgRateIn() + localData.getMsgRateOut();
final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
try {
final TimeAverageBrokerData timeAverageData = gson.fromJson(
new String(zkClient.getData(timeAveragePath, false, null)),
Expand Down Expand Up @@ -314,7 +314,7 @@ private synchronized void printData(final String path) {
printLoadReport(broker, gson.fromJson(jsonString, LoadReport.class));
} else {
final LocalBrokerData localBrokerData = gson.fromJson(jsonString, LocalBrokerData.class);
final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
try {
final TimeAverageBrokerData timeAverageData = gson.fromJson(
new String(zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);
Expand Down

0 comments on commit 30d59e3

Please sign in to comment.