Skip to content

Commit ed72208

Browse files
committed
Address review comments
1 parent 8543d6d commit ed72208

File tree

7 files changed

+330
-47
lines changed

7 files changed

+330
-47
lines changed

Diff for: pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ public AdjustableSemaphore(int permits, boolean fair) {
4141

4242
/**
4343
* Sets the total number of permits to the given value without blocking.
44+
* Synchronized to allow multiple threads to update permits concurrently
4445
*/
45-
public void setPermits(int permits) {
46+
public synchronized void setPermits(int permits) {
4647
Preconditions.checkArgument(permits > 0, "Permits must be a positive integer");
4748
if (permits < _totalPermits) {
4849
reducePermits(_totalPermits - permits);

Diff for: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java

+88-23
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.base.Preconditions;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import org.apache.pinot.common.concurrency.AdjustableSemaphore;
2526
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
2627
import org.apache.pinot.spi.utils.CommonConstants;
@@ -36,87 +37,146 @@ public class SegmentPreprocessThrottler implements PinotClusterConfigChangeListe
3637
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
3738

3839
/**
39-
* _maxPreprocessConcurrency must be >= 0. To effectively disable throttling, this can be set to a very high value
40+
* _maxPreprocessConcurrency and _maxConcurrentPreprocessesBeforeServingQueries must be >= 0. To effectively disable
41+
* throttling, this can be set to a very high value
4042
*/
4143
private int _maxPreprocessConcurrency;
44+
private int _maxPreprocessConcurrencyBeforeServingQueries;
4245
private boolean _relaxThrottling;
4346
private final AdjustableSemaphore _semaphore;
4447

4548
/**
4649
* @param maxPreprocessConcurrency configured preprocessing concurrency
47-
* @param maxConcurrentPreprocessesBeforeServingQueries configured preprocessing concurrency before serving queries
50+
* @param maxPreprocessConcurrencyBeforeServingQueries configured preprocessing concurrency before serving queries
4851
* @param relaxThrottling whether to relax throttling prior to serving queries
4952
*/
50-
public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int maxConcurrentPreprocessesBeforeServingQueries,
53+
public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int maxPreprocessConcurrencyBeforeServingQueries,
5154
boolean relaxThrottling) {
5255
LOGGER.info("Initializing SegmentPreprocessThrottler, maxPreprocessConcurrency: {}, "
53-
+ "maxConcurrentPreprocessesBeforeServingQueries: {}, relaxThrottling: {}",
54-
maxPreprocessConcurrency, maxConcurrentPreprocessesBeforeServingQueries, relaxThrottling);
56+
+ "maxPreprocessConcurrencyBeforeServingQueries: {}, relaxThrottling: {}",
57+
maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, relaxThrottling);
5558
Preconditions.checkArgument(maxPreprocessConcurrency > 0,
56-
"Max preprocess parallelism must be >= 0, but found to be: " + maxPreprocessConcurrency);
57-
Preconditions.checkArgument(maxConcurrentPreprocessesBeforeServingQueries > 0,
58-
"Max preprocess parallelism before serving queries must be >= 0, but found to be: "
59-
+ maxConcurrentPreprocessesBeforeServingQueries);
59+
"Max preprocess parallelism must be > 0, but found to be: " + maxPreprocessConcurrency);
60+
Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 0,
61+
"Max preprocess parallelism before serving queries must be > 0, but found to be: "
62+
+ maxPreprocessConcurrencyBeforeServingQueries);
6063

6164
_maxPreprocessConcurrency = maxPreprocessConcurrency;
65+
_maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries;
6266
_relaxThrottling = relaxThrottling;
6367

6468
// maxConcurrentPreprocessesBeforeServingQueries is only used prior to serving queries and once the server is
6569
// ready to serve queries this is not used again. Thus, it is safe to only pick up this configuration during
6670
// server startup. There is no need to allow updates to this via the ZK CLUSTER config handler
67-
int relaxThrottlingThreshold = Math.max(_maxPreprocessConcurrency, maxConcurrentPreprocessesBeforeServingQueries);
71+
int relaxThrottlingThreshold = Math.max(_maxPreprocessConcurrency, _maxPreprocessConcurrencyBeforeServingQueries);
6872
int preprocessConcurrency = _maxPreprocessConcurrency;
6973
if (relaxThrottling) {
7074
preprocessConcurrency = relaxThrottlingThreshold;
7175
LOGGER.info("Relax throttling enabled, setting preprocess concurrency to: {}", preprocessConcurrency);
7276
}
7377
_semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
74-
LOGGER.info("Created semaphore with available permits: {}", _semaphore.availablePermits());
78+
LOGGER.info("Created semaphore with total permits: {}, available permits: {}", totalPermits(),
79+
availablePermits());
7580
}
7681

7782
public synchronized void resetThrottling() {
78-
LOGGER.info("Reset throttling threshold for segment preprocess concurrency, available permits: {}",
79-
_semaphore.availablePermits());
83+
LOGGER.info("Reset throttling threshold for segment preprocess concurrency, total permits: {}, available "
84+
+ "permits: {}", totalPermits(), availablePermits());
8085
_relaxThrottling = false;
8186
_semaphore.setPermits(_maxPreprocessConcurrency);
82-
LOGGER.info("Reset throttling completed, new concurrency: {}, available permits: {}", _maxPreprocessConcurrency,
83-
_semaphore.availablePermits());
87+
LOGGER.info("Reset throttling completed, new concurrency: {}, total permits: {}, available permits: {}",
88+
_maxPreprocessConcurrency, totalPermits(), availablePermits());
8489
}
8590

8691
@Override
87-
public synchronized void onChange(Map<String, String> clusterConfigs) {
92+
public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) {
8893
if (clusterConfigs == null || clusterConfigs.isEmpty()) {
8994
LOGGER.info("Skip updating SegmentPreprocessThrottler configs with empty clusterConfigs");
9095
return;
9196
}
97+
98+
if (changedConfigs == null || changedConfigs.isEmpty()) {
99+
LOGGER.info("Skip updating SegmentPreprocessThrottler configs with unchanged clusterConfigs");
100+
return;
101+
}
102+
92103
LOGGER.info("Updating SegmentPreprocessThrottler configs with latest clusterConfigs");
104+
handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
105+
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, clusterConfigs);
106+
LOGGER.info("Updated SegmentPreprocessThrottler configs with latest clusterConfigs, total permits: {}",
107+
totalPermits());
108+
}
109+
110+
private void handleMaxPreprocessConcurrencyChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) {
111+
if (!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM)) {
112+
LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was not updated, skipping updates");
113+
return;
114+
}
115+
93116
String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
94117
String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
95118
String maxParallelSegmentPreprocessesStr = clusterConfigs.getOrDefault(configName, defaultConfigValue);
96119
int maxPreprocessConcurrency = Integer.parseInt(maxParallelSegmentPreprocessesStr);
97120

98121
if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
99-
LOGGER.info("No ZK update for SegmentPreprocessThrottler, available permits: {}", _semaphore.availablePermits());
122+
LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total permits: {}", _maxPreprocessConcurrency,
123+
totalPermits());
100124
return;
101125
}
102126

103127
if (maxPreprocessConcurrency <= 0) {
104-
LOGGER.warn("Invalid max preprocess parallelism set: {}, not making change, fix config and try again",
128+
LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making change, fix config and try again",
105129
maxPreprocessConcurrency);
106130
return;
107131
}
108132

109-
LOGGER.info("Updated max preprocess parallelism from: {} to: {}", _maxPreprocessConcurrency,
133+
LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", _maxPreprocessConcurrency,
110134
maxPreprocessConcurrency);
111-
112135
_maxPreprocessConcurrency = maxPreprocessConcurrency;
136+
113137
if (_relaxThrottling) {
114-
LOGGER.warn("Reset throttling has not yet been called, not updating the permits");
138+
LOGGER.warn("Reset throttling hasn't been called yet, not updating the permits with maxPreprocessConcurrency");
115139
return;
116140
}
117-
118141
_semaphore.setPermits(_maxPreprocessConcurrency);
119-
LOGGER.info("Updated SegmentPreprocessThrottler configs with latest clusterConfigs");
142+
}
143+
144+
private void handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> changedConfigs,
145+
Map<String, String> clusterConfigs) {
146+
if (!changedConfigs.contains(
147+
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)) {
148+
LOGGER.info("changedConfigs list indicates maxPreprocessConcurrencyBeforeServingQueries was not updated, "
149+
+ "skipping updates");
150+
return;
151+
}
152+
153+
String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
154+
String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
155+
String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
156+
clusterConfigs.getOrDefault(configName, defaultConfigValue);
157+
int maxPreprocessConcurrencyBeforeServingQueries =
158+
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
159+
160+
if (maxPreprocessConcurrencyBeforeServingQueries == _maxPreprocessConcurrencyBeforeServingQueries) {
161+
LOGGER.info("No ZK update for maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
162+
_maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
163+
return;
164+
}
165+
166+
if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
167+
LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: {}, not making change, fix config "
168+
+ "and try again", maxPreprocessConcurrencyBeforeServingQueries);
169+
return;
170+
}
171+
172+
LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} to: {}",
173+
_maxPreprocessConcurrencyBeforeServingQueries, maxPreprocessConcurrencyBeforeServingQueries);
174+
_maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries;
175+
if (_relaxThrottling) {
176+
LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries was updated before reset throttling was called, "
177+
+ "updating the permits");
178+
_semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
179+
}
120180
}
121181

122182
/**
@@ -144,4 +204,9 @@ public void release() {
144204
int availablePermits() {
145205
return _semaphore.availablePermits();
146206
}
207+
208+
@VisibleForTesting
209+
int totalPermits() {
210+
return _semaphore.getTotalPermits();
211+
}
147212
}

0 commit comments

Comments
 (0)