Skip to content

Commit bb79ca7

Browse files
9amanGlen Matsushita
authored and
Glen Matsushita
committed
Pauseless ingestion without failure scenarios (apache#14741)
1 parent 9fbf0f7 commit bb79ca7

File tree

16 files changed

+841
-115
lines changed

16 files changed

+841
-115
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.utils;
20+
21+
import java.util.Optional;
22+
import javax.validation.constraints.NotNull;
23+
import org.apache.pinot.spi.config.table.TableConfig;
24+
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
25+
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
26+
27+
28+
public class PauselessConsumptionUtils {
29+
30+
private PauselessConsumptionUtils() {
31+
// Private constructor to prevent instantiation of utility class
32+
}
33+
34+
/**
35+
* Checks if pauseless consumption is enabled for the given table configuration.
36+
* Returns false if any configuration component is missing or if the flag is not set to true.
37+
*
38+
* @param tableConfig The table configuration to check. Must not be null.
39+
* @return true if pauseless consumption is explicitly enabled, false otherwise
40+
* @throws NullPointerException if tableConfig is null
41+
*/
42+
public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
43+
return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
44+
.map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
45+
}
46+
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java

+63-50
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.controller.helix.core.realtime;
20+
21+
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
22+
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
23+
import org.apache.pinot.common.utils.LLCSegmentName;
24+
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
25+
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
26+
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
27+
import org.apache.pinot.spi.utils.CommonConstants;
28+
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
29+
30+
31+
public class PauselessSegmentCompletionFSM extends BlockingSegmentCompletionFSM {
32+
public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
33+
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName,
34+
SegmentZKMetadata segmentMetadata) {
35+
super(segmentManager, segmentCompletionManager, segmentName, segmentMetadata);
36+
if (segmentMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING) {
37+
StreamPartitionMsgOffsetFactory factory =
38+
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
39+
StreamPartitionMsgOffset endOffset = factory.create(segmentMetadata.getEndOffset());
40+
_state = BlockingSegmentCompletionFSMState.COMMITTED;
41+
_winningOffset = endOffset;
42+
_winner = "UNKNOWN";
43+
}
44+
}
45+
46+
@Override
47+
protected SegmentCompletionProtocol.Response committerNotifiedCommit(
48+
SegmentCompletionProtocol.Request.Params reqParams, long now) {
49+
String instanceId = reqParams.getInstanceId();
50+
StreamPartitionMsgOffset offset = _streamPartitionMsgOffsetFactory.create(reqParams.getStreamPartitionMsgOffset());
51+
SegmentCompletionProtocol.Response response = checkBadCommitRequest(instanceId, offset, now);
52+
if (response != null) {
53+
return response;
54+
}
55+
try {
56+
CommittingSegmentDescriptor committingSegmentDescriptor =
57+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
58+
LOGGER.info(
59+
"Starting to commit changes to ZK and ideal state for the segment:{} during pauseles ingestion as the "
60+
+ "leader has been selected", _segmentName);
61+
_segmentManager.commitSegmentStartMetadata(
62+
TableNameBuilder.REALTIME.tableNameWithType(_segmentName.getTableName()), committingSegmentDescriptor);
63+
} catch (Exception e) {
64+
// this aims to handle the failures during commitSegmentStartMetadata
65+
// we abort the state machine to allow commit protocol to start from the beginning
66+
// the server would then retry the commit protocol from the start
67+
return abortAndReturnFailed();
68+
}
69+
_logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, offset);
70+
_state = BlockingSegmentCompletionFSMState.COMMITTER_UPLOADING;
71+
long commitTimeMs = now - _startTimeMs;
72+
if (commitTimeMs > _initialCommitTimeMs) {
73+
// We assume that the commit time holds for all partitions. It is possible, though, that one partition
74+
// commits at a lower time than another partition, and the two partitions are going simultaneously,
75+
// and we may not get the maximum value all the time.
76+
_segmentCompletionManager.setCommitTime(_segmentName.getTableName(), commitTimeMs);
77+
}
78+
return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
79+
}
80+
81+
@Override
82+
public SegmentCompletionProtocol.Response extendBuildTime(final String instanceId,
83+
final StreamPartitionMsgOffset offset, final int extTimeSec) {
84+
final long now = _segmentCompletionManager.getCurrentTimeMs();
85+
synchronized (this) {
86+
_logger.info("Processing extendBuildTime({}, {}, {})", instanceId, offset, extTimeSec);
87+
switch (_state) {
88+
case PARTIAL_CONSUMING:
89+
case HOLDING:
90+
case COMMITTER_DECIDED:
91+
case COMMITTER_NOTIFIED:
92+
return fail(instanceId, offset);
93+
case COMMITTER_UPLOADING:
94+
return committerNotifiedExtendBuildTime(instanceId, offset, extTimeSec, now);
95+
case COMMITTING:
96+
case COMMITTED:
97+
case ABORTED:
98+
default:
99+
return fail(instanceId, offset);
100+
}
101+
}
102+
}
103+
104+
@Override
105+
protected void commitSegmentMetadata(String realtimeTableName,
106+
CommittingSegmentDescriptor committingSegmentDescriptor) {
107+
_segmentManager.commitSegmentEndMetadata(realtimeTableName, committingSegmentDescriptor);
108+
}
109+
110+
@Override
111+
protected SegmentCompletionProtocol.Response handleNonWinnerCase(String instanceId, StreamPartitionMsgOffset offset) {
112+
// Common case: A different instance is reporting.
113+
if (offset.compareTo(_winningOffset) == 0) {
114+
// The winner has already updated the segment's ZK metadata for the committing segment.
115+
// Additionally, a new consuming segment has been created for pauseless ingestion.
116+
// Return "keep" to allow the server to build the segment and begin ingestion for the new consuming segment.
117+
return keep(instanceId, offset);
118+
} else if (offset.compareTo(_winningOffset) < 0) {
119+
return catchup(instanceId, offset);
120+
} else {
121+
// We have not yet committed, so ask the new responder to hold. They may be the new leader in case the
122+
// committer fails.
123+
return hold(instanceId, offset);
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)