-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return segments in COMMITTING status in the pauseStatus API for pauseless enabled tables #14908
base: master
Are you sure you want to change the base?
Changes from all commits
f946111
eb2535c
0c77011
16ed5e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; | ||
import org.apache.pinot.common.utils.FileUploadDownloadClient; | ||
import org.apache.pinot.common.utils.LLCSegmentName; | ||
import org.apache.pinot.common.utils.PauselessConsumptionUtils; | ||
import org.apache.pinot.common.utils.URIUtils; | ||
import org.apache.pinot.common.utils.helix.HelixHelper; | ||
import org.apache.pinot.controller.ControllerConf; | ||
|
@@ -1974,6 +1975,22 @@ private Set<String> findConsumingSegments(IdealState idealState) { | |
} | ||
} | ||
}); | ||
// For pauseless tables, a segment marked ONLINE in the ideal state may not have been committed yet. | ||
// We rely on SegmentZkMetadata to determine whether a segment has been committed (status is DONE) | ||
// instead of relying solely on the ideal state. | ||
// A segment in COMMITTING state is treated as consuming for pauseStatus. | ||
String tableNameWithType = idealState.getResourceName(); | ||
if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) { | ||
Map<Integer, SegmentZKMetadata> metadataMap = getLatestSegmentZKMetadataMap(tableNameWithType); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work? Committing segment won't be the latest segment in each partition. I think this will never find committing segment, but always There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I started with an assumption that the user has issued a No new consuming segments will be created for the table when the
I did not want to add a function
|
||
if (metadataMap != null) { | ||
metadataMap.values() | ||
.stream() | ||
.filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName())) | ||
.filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING) | ||
.map(SegmentZKMetadata::getSegmentName) | ||
.forEach(consumingSegments::add); | ||
} | ||
} | ||
return consumingSegments; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we consider adding a new method
findCommittingSegments
to return the committing segments for pauseless table? I don't want to mix consuming segment and committing segment because this way it can potentially return 2 segments per partition.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have addressed this below: #14908 (comment)