Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports Force Committing Segments in Batches #14811

Open
wants to merge 62 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
eeb5be1
Supports batching in ForceCommit API
noob-se7en Jan 12, 2025
5f5a554
nit
noob-se7en Jan 12, 2025
ca5104a
Refactoring
noob-se7en Jan 14, 2025
434e8a3
nit
noob-se7en Jan 14, 2025
504f3c9
nit
noob-se7en Jan 14, 2025
987bb00
nit
noob-se7en Jan 14, 2025
ff25c5f
nit
noob-se7en Jan 14, 2025
e28ff47
nit
noob-se7en Jan 14, 2025
99a7cee
nit
noob-se7en Jan 14, 2025
255bc34
lint
noob-se7en Jan 14, 2025
3a9e41a
nit
noob-se7en Jan 14, 2025
b2eeb85
fixes lint
noob-se7en Jan 14, 2025
1782207
nit
noob-se7en Jan 14, 2025
90db3b8
Merge branch 'master' of github.com:Harnoor7/pinot into add_batch_for…
noob-se7en Jan 15, 2025
fa418b9
refactoring
noob-se7en Jan 15, 2025
470c6eb
refactoring
noob-se7en Jan 15, 2025
8de7bfc
fixes bug
noob-se7en Jan 15, 2025
4f2d4fc
nit
noob-se7en Jan 15, 2025
50af02e
nit
noob-se7en Jan 15, 2025
09d557e
nit
noob-se7en Jan 15, 2025
32b7fd5
nit
noob-se7en Jan 15, 2025
e334983
nit
noob-se7en Jan 15, 2025
1aecc5a
fix_bug
noob-se7en Jan 15, 2025
5be2722
Adds scheduling logic in controller
noob-se7en Jan 15, 2025
153a897
nit
noob-se7en Jan 15, 2025
430127d
fixes lint
noob-se7en Jan 15, 2025
f20948e
fixes bug
noob-se7en Jan 15, 2025
5012b5f
nit
noob-se7en Jan 15, 2025
c2312d2
nit
noob-se7en Jan 15, 2025
49474f5
fix bug
noob-se7en Jan 16, 2025
ab2220f
Updates foceCommit API to handle Pauseless
noob-se7en Jan 16, 2025
ed90f11
updates metadata
noob-se7en Jan 16, 2025
e88aa2a
fixes lint
noob-se7en Jan 16, 2025
8c8d8d3
adds tests
noob-se7en Jan 16, 2025
1be0316
saves 1 zk call
noob-se7en Jan 16, 2025
55fa6e2
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 16, 2025
3297ddd
updates log
noob-se7en Jan 16, 2025
748d0d3
Adds tests
noob-se7en Jan 16, 2025
095acc0
Addresses PR comments
noob-se7en Jan 17, 2025
36360b8
nit
noob-se7en Jan 17, 2025
d3d42ca
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
262bee0
pulls latest changes for pauseless
noob-se7en Jan 17, 2025
68cdc26
adds unit test
noob-se7en Jan 17, 2025
9f833c6
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
f5d68ae
addresses comment
noob-se7en Jan 17, 2025
bffab6d
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 17, 2025
a1079c2
Addresses Pr comment
noob-se7en Jan 17, 2025
b8a2e7f
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 17, 2025
5730a06
addresses PR comments
noob-se7en Jan 17, 2025
c8565d6
nit
noob-se7en Jan 17, 2025
165e7ab
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
0cab772
refactoring
noob-se7en Jan 17, 2025
de04824
Addresses PR comments
noob-se7en Jan 17, 2025
b95a2f6
nit
noob-se7en Jan 17, 2025
857dd6a
attempt to fix test
noob-se7en Jan 23, 2025
2f7e5d9
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 23, 2025
0b64439
nit
noob-se7en Jan 23, 2025
9e3ddad
Attempts to fix test
noob-se7en Jan 23, 2025
01604e9
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 23, 2025
bb84ae2
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 24, 2025
71f4ee1
Attempts to fix test
noob-se7en Jan 24, 2025
2408d13
attempt to fix test
noob-se7en Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ public class ForceCommitMessage extends Message {
public static final String FORCE_COMMIT_MSG_SUB_TYPE = "FORCE_COMMIT";
private static final String TABLE_NAME = "tableName";
private static final String SEGMENT_NAMES = "segmentNames";
private static final String BATCH_SIZE = "batchSize";

public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames) {
public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames, int batchSize) {
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
setMsgSubType(FORCE_COMMIT_MSG_SUB_TYPE);
setExecutionTimeout(-1); // no timeout
ZNRecord znRecord = getRecord();
znRecord.setSimpleField(TABLE_NAME, tableNameWithType);
znRecord.setSimpleField(SEGMENT_NAMES, String.join(",", segmentNames));
znRecord.setIntField(BATCH_SIZE, batchSize);
}

public ForceCommitMessage(Message message) {
Expand All @@ -59,4 +61,8 @@ public String getTableName() {
public Set<String> getSegmentNames() {
return Arrays.stream(getRecord().getSimpleField(SEGMENT_NAMES).split(",")).collect(Collectors.toSet());
}

public int getBatchSize() {
return getRecord().getIntField(BATCH_SIZE, Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -250,6 +251,22 @@ public static IdealState getTableIdealState(HelixManager manager, String resourc
return accessor.getProperty(builder.idealStates(resourceName));
}

public static Set<String> getOnlineSegmentsFromIdealState(HelixManager manager, String tableNameWithType,
boolean includeConsuming) {
IdealState tableIdealState = getTableIdealState(manager, tableNameWithType);
Preconditions.checkState((tableIdealState != null), "Table ideal state is null");
Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields();
Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE) || (includeConsuming
&& instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING))) {
matchingSegments.add(entry.getKey());
}
}
return matchingSegments;
}

public static ExternalView getExternalViewForResource(HelixAdmin admin, String clusterName, String resourceName) {
return admin.getResourceExternalView(clusterName, resourceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,28 @@ public Map<String, String> forceCommit(
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions")
String partitionGroupIds,
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments")
String consumingSegments, @Context HttpHeaders headers) {
String consumingSegments,
@ApiParam(value = "Max number of segments a server can commit at once") @QueryParam("batchSize")
Integer batchSize, @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
if (partitionGroupIds != null && consumingSegments != null) {
throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit",
Response.Status.BAD_REQUEST);
}
if (batchSize == null) {
batchSize = Integer.MAX_VALUE;
} else if (batchSize <= 0) {
throw new ControllerApplicationException(LOGGER, "Batch size should be greater than zero",
Response.Status.BAD_REQUEST);
}
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validateTable(tableNameWithType);
Map<String, String> response = new HashMap<>();
try {
Set<String> consumingSegmentsForceCommitted =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments);
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments,
batchSize);
response.put("forceCommitStatus", "SUCCESS");
try {
String jobId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1725,15 +1725,16 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin
* @param tableNameWithType table name with type
* @param partitionGroupIdsToCommit comma separated list of partition group IDs to commit
* @param segmentsToCommit comma separated list of consuming segments to commit
* @param batchSize max number of consuming segments a server can commit at once
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit,
@Nullable String segmentsToCommit) {
@Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments, batchSize);
return targetConsumingSegments;
}

Expand Down Expand Up @@ -1779,7 +1780,7 @@ public PauseStatusDetails pauseConsumption(String tableNameWithType, PauseState.
@Nullable String comment) {
IdealState updatedIdealState = updatePauseStateInIdealState(tableNameWithType, true, reasonCode, comment);
Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
sendForceCommitMessageToServers(tableNameWithType, consumingSegments, Integer.MAX_VALUE);
return new PauseStatusDetails(true, consumingSegments, reasonCode, comment != null ? comment
: "Pause flag is set. Consuming segments are being committed."
+ " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.",
Expand Down Expand Up @@ -1824,14 +1825,14 @@ public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean
return updatedIdealState;
}

private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments, int batchSize) {
if (!consumingSegments.isEmpty()) {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments, batchSize);
int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void reloadSegments(String tableNameWithType, List<String> segmentNames, boolean
/**
* Immediately stop consumption and start committing the consuming segments.
*/
void forceCommit(String tableNameWithType, Set<String> segmentNames);
void forceCommit(String tableNameWithType, Set<String> segmentNames, int batchSize);

/**
* Enables the installation of a method to determine if a server is ready to server queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
Expand Down Expand Up @@ -80,6 +81,7 @@
@ThreadSafe
public class HelixInstanceDataManager implements InstanceDataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class);
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;

private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>();
// TODO: Consider making segment locks per table instead of per instance
Expand Down Expand Up @@ -591,24 +593,98 @@ public SegmentUploader getSegmentUploader() {
}

@Override
public void forceCommit(String tableNameWithType, Set<String> segmentNames) {
public void forceCommit(String tableNameWithType, Set<String> segmentNames, int batchSize) {
Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String.format(
"Force commit is only supported for segments of realtime tables - table name: %s segment names: %s",
tableNameWithType, segmentNames));

TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
if (tableDataManager != null) {
segmentNames.forEach(segName -> {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName);
if (segmentDataManager != null) {
try {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
} finally {
tableDataManager.releaseSegment(segmentDataManager);
}
List<RealtimeSegmentDataManager> segmentsToCommit = getSegmentsToCommit(tableDataManager, segmentNames);

try {
List<List<RealtimeSegmentDataManager>> segmentBatchList = divideSegmentsInBatches(segmentsToCommit, batchSize);

CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (List<RealtimeSegmentDataManager> segmentBatchToCommit : segmentBatchList) {
future = future.thenRun(() -> executeBatch(tableDataManager, segmentBatchToCommit));
}

future.join();
} finally {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentsToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
}
}

private List<RealtimeSegmentDataManager> getSegmentsToCommit(TableDataManager tableDataManager,
Set<String> segmentNames) {
List<RealtimeSegmentDataManager> segmentsToCommit = new ArrayList<>();

if (tableDataManager == null) {
return segmentsToCommit;
}

for (String segmentName : segmentNames) {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
if (segmentDataManager != null) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
segmentsToCommit.add((RealtimeSegmentDataManager) segmentDataManager);
} else {
tableDataManager.releaseSegment(segmentDataManager);
}
});
}
}

return segmentsToCommit;
}

private List<List<RealtimeSegmentDataManager>> divideSegmentsInBatches(
List<RealtimeSegmentDataManager> segmentsToCommit,
int batchSize) {
List<List<RealtimeSegmentDataManager>> segmentBatchListToRet = new ArrayList<>();
List<RealtimeSegmentDataManager> lastBatch = new ArrayList<>();

for (RealtimeSegmentDataManager segmentDataManager : segmentsToCommit) {
lastBatch.add(segmentDataManager);
if (lastBatch.size() == batchSize) {
segmentBatchListToRet.add(lastBatch);
lastBatch.clear();
}
}

if (!lastBatch.isEmpty()) {
segmentBatchListToRet.add(lastBatch);
}

return segmentBatchListToRet;
}

private void executeBatch(TableDataManager tableDataManager, List<RealtimeSegmentDataManager> segmentBatchToCommit) {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentBatchToCommit) {
realtimeSegmentDataManager.forceCommit();
}

while (!isBatchSuccessful(tableDataManager, segmentBatchToCommit)) {
try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private boolean isBatchSuccessful(TableDataManager tableDataManager,
List<RealtimeSegmentDataManager> segmentBatchToCommit) {
Set<String> onlineSegmentsForTable =
HelixHelper.getOnlineSegmentsFromIdealState(_helixManager, tableDataManager.getTableName(), false);

for (SegmentDataManager segmentDataManager : segmentBatchToCommit) {
if (!onlineSegmentsForTable.contains(segmentDataManager.getSegmentName())) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ private class ForceCommitMessageHandler extends DefaultMessageHandler {

private String _tableName;
private Set<String> _segmentNames;
private final int _batchSize;

public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics,
NotificationContext ctx) {
super(forceCommitMessage, metrics, ctx);
_tableName = forceCommitMessage.getTableName();
_segmentNames = forceCommitMessage.getSegmentNames();
_batchSize = forceCommitMessage.getBatchSize();
}

@Override
Expand All @@ -229,7 +231,7 @@ public HelixTaskResult handleMessage()
HelixTaskResult helixTaskResult = new HelixTaskResult();
_logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames);
try {
_instanceDataManager.forceCommit(_tableName, _segmentNames);
_instanceDataManager.forceCommit(_tableName, _segmentNames, _batchSize);
helixTaskResult.setSuccess(true);
} catch (Exception e) {
_metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
Expand Down
Loading