Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return segments in COMMITTING status in the pauseStatus API for pauseless enabled tables #14908

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
Expand Down Expand Up @@ -1974,6 +1975,22 @@ private Set<String> findConsumingSegments(IdealState idealState) {
}
}
});
// For pauseless tables, a segment marked ONLINE in the ideal state may not have been committed yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider adding a new method findCommittingSegments to return the committing segments for pauseless table? I don't want to mix consuming segment and committing segment because this way it can potentially return 2 segments per partition.

Copy link
Contributor Author

@9aman 9aman Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have addressed this below: #14908 (comment)

// We rely on SegmentZkMetadata to determine whether a segment has been committed (status is DONE)
// instead of relying solely on the ideal state.
// A segment in COMMITTING state is treated as consuming for pauseStatus.
String tableNameWithType = idealState.getResourceName();
if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) {
Map<Integer, SegmentZKMetadata> metadataMap = getLatestSegmentZKMetadataMap(tableNameWithType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? Committing segment won't be the latest segment in each partition. I think this will never find committing segment, but always IN_PROGRESS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with an assumption that the user has issued a pause request before the status is checked.
I am not sure whether this assumption is correct.

No new consuming segments will be created for the table when the force commit message is sent as part of the pause table call.
This ensures that the last segment is :

  1. Marked CONSUMING in the IS as the commit protocol has not started.
  2. Marked ONLINE but has status COMMITTING status as the commit protocol has not finished.

I did not want to add a function findCommittingSegments here for two reasons:

  1. Correctness: an older COMMITTING segment that failed to complete commit protocol will also be returned in the pause status and I feel that's wrong indication of the pause status. The user is only concerned with the latest segment's commit status. Previously this status was derived from IS and for pauseless it derived from ZK metadata.
  2. Performance: Fetching ZK metadata for all the segments is an expensive operation.

if (metadataMap != null) {
metadataMap.values()
.stream()
.filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName()))
.filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING)
.map(SegmentZKMetadata::getSegmentName)
.forEach(consumingSegments::add);
}
}
return consumingSegments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -57,17 +58,21 @@
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
Expand Down Expand Up @@ -1248,6 +1253,60 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

@Test
void testPauseStatus() {
// Set up a new table with 2 replicas, 5 instances, 4 partition
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);

// pause the table
PauseState pauseState = new PauseState(true, PauseState.ReasonCode.ADMINISTRATIVE, "comment",
new Timestamp(CURRENT_TIME_MS).toString());
segmentManager._idealState.getRecord()
.getSimpleFields()
.put(PinotLLCRealtimeSegmentManager.PAUSE_STATE, pauseState.toJsonString());

// update the ideal state to ONLINE for the segment
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
Map<String, String> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields().get(committingSegment);
instanceStatesMap.replaceAll((k, v) -> "ONLINE");
segmentManager._idealState.getRecord().getMapFields().put(committingSegment, instanceStatesMap);

// update the segment metadata to COMMITTING
SegmentZKMetadata segmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
segmentZKMetadata.setStatus(Status.COMMITTING);
segmentManager._segmentZKMetadataMap.put(committingSegment, segmentZKMetadata);

PauseStatusDetails pauseStatusDetails =
segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME));

// assert that the segment is ONLINE and is not present as a consuming segment in the PauseStatusDetails
// as for non pauseless enabled tables the consuming segments are derived solely from ideal state
assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment)
.values().stream().findFirst().orElseThrow());
assertFalse(pauseStatusDetails.getConsumingSegments().contains(committingSegment));

// enable pauseless for table
StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(null);
streamIngestionConfig.setPauselessConsumptionEnabled(true);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
segmentManager._tableConfig.setIngestionConfig(ingestionConfig);

pauseStatusDetails =
segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME));

// assert that the segment is ONLINE and is present as a consuming segment in the PauseStatusDetails
// as it is still COMMITTING

assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment)
.values().stream().findFirst().orElseThrow());
assertTrue(pauseStatusDetails.getConsumingSegments().contains(committingSegment));
}

@Test
public void getSegmentsYetToBeCommitted() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
Expand Down
Loading