Skip to content

Commit 431baa7

Browse files
committed
[FLINK-38464] Introduce OrderedMultiSetState
1 parent 5ff7c67 commit 431baa7

23 files changed

+3111
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.sequencedmultisetstate;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
25+
import org.apache.flink.util.function.FunctionWithException;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.IOException;
31+
import java.util.Iterator;
32+
import java.util.function.Function;
33+
34+
import static org.apache.flink.util.Preconditions.checkArgument;
35+
36+
/**
37+
* An {@link SequencedMultiSetState} that switches dynamically between {@link
38+
* ValueStateMultiSetState} and {@link LinkedMultiSetState} based on the number of elements.
39+
*/
40+
class AdaptiveSequencedMultiSetState implements SequencedMultiSetState<RowData> {
41+
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSequencedMultiSetState.class);
42+
43+
private final ValueStateMultiSetState smallState;
44+
private final LinkedMultiSetState largeState;
45+
private final long switchToLargeThreshold;
46+
private final long switchToSmallThreshold;
47+
48+
AdaptiveSequencedMultiSetState(
49+
ValueStateMultiSetState smallState,
50+
LinkedMultiSetState largeState,
51+
long switchToLargeThreshold,
52+
long switchToSmallThreshold) {
53+
checkArgument(switchToLargeThreshold > switchToSmallThreshold);
54+
this.smallState = smallState;
55+
this.largeState = largeState;
56+
this.switchToLargeThreshold = switchToLargeThreshold;
57+
this.switchToSmallThreshold = switchToSmallThreshold;
58+
LOG.info(
59+
"Created {} with thresholds: {}=>large, {}=>small",
60+
this.getClass().getSimpleName(),
61+
switchToLargeThreshold,
62+
switchToSmallThreshold);
63+
}
64+
65+
@Override
66+
public StateChangeInfo<RowData> add(RowData element, long timestamp) throws Exception {
67+
return execute(
68+
state -> state.add(element, timestamp), StateChangeInfo::getSizeAfter, "add");
69+
}
70+
71+
@Override
72+
public StateChangeInfo<RowData> append(RowData element, long timestamp) throws Exception {
73+
return execute(
74+
state -> state.append(element, timestamp), StateChangeInfo::getSizeAfter, "append");
75+
}
76+
77+
@Override
78+
public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
79+
if (smallState.isEmpty()) {
80+
return largeState.iterator();
81+
} else {
82+
return smallState.iterator();
83+
}
84+
}
85+
86+
@Override
87+
public boolean isEmpty() throws IOException {
88+
// large state check is faster
89+
return largeState.isEmpty() && smallState.isEmpty();
90+
}
91+
92+
public StateChangeInfo<RowData> remove(RowData element) throws Exception {
93+
return execute(state -> state.remove(element), StateChangeInfo::getSizeAfter, "remove");
94+
}
95+
96+
@Override
97+
public void clear() {
98+
clearCache();
99+
smallState.clear();
100+
largeState.clear();
101+
}
102+
103+
@Override
104+
public void loadCache() throws IOException {
105+
smallState.loadCache();
106+
largeState.loadCache();
107+
}
108+
109+
@Override
110+
public void clearCache() {
111+
smallState.clearCache();
112+
largeState.clearCache();
113+
}
114+
115+
private <T> T execute(
116+
FunctionWithException<SequencedMultiSetState<RowData>, T, Exception> stateOp,
117+
Function<T, Long> getNewSize,
118+
String action)
119+
throws Exception {
120+
121+
final boolean isUsingLarge = isIsUsingLargeState();
122+
123+
// start with small state, i.e. choose smallState when both are empty
124+
SequencedMultiSetState<RowData> currentState = isUsingLarge ? largeState : smallState;
125+
SequencedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;
126+
127+
T result = stateOp.apply(currentState);
128+
final long sizeAfter = getNewSize.apply(result);
129+
130+
final boolean thresholdReached =
131+
isUsingLarge
132+
? sizeAfter <= switchToSmallThreshold
133+
: sizeAfter >= switchToLargeThreshold;
134+
135+
if (thresholdReached) {
136+
LOG.debug(
137+
"Switch {} -> {} because '{}' resulted in state size reaching {} elements",
138+
currentState.getClass().getSimpleName(),
139+
otherState.getClass().getSimpleName(),
140+
action,
141+
sizeAfter);
142+
switchState(currentState, otherState);
143+
}
144+
145+
clearCache();
146+
return result;
147+
}
148+
149+
@VisibleForTesting
150+
boolean isIsUsingLargeState() throws IOException {
151+
smallState.loadCache();
152+
if (!smallState.isEmpty()) {
153+
return false;
154+
}
155+
largeState.loadCache();
156+
return !largeState.isEmpty();
157+
}
158+
159+
private void switchState(
160+
SequencedMultiSetState<RowData> src, SequencedMultiSetState<RowData> dst)
161+
throws Exception {
162+
Iterator<Tuple2<RowData, Long>> it = src.iterator();
163+
while (it.hasNext()) {
164+
Tuple2<RowData, Long> next = it.next();
165+
dst.append(next.f0, next.f1);
166+
}
167+
src.clear();
168+
}
169+
170+
public static AdaptiveSequencedMultiSetState create(
171+
SequencedMultiSetStateConfig sequencedMultiSetStateConfig,
172+
String backendTypeIdentifier,
173+
ValueStateMultiSetState smallState,
174+
LinkedMultiSetState largeState) {
175+
return new AdaptiveSequencedMultiSetState(
176+
smallState,
177+
largeState,
178+
sequencedMultiSetStateConfig
179+
.getAdaptiveHighThresholdOverride()
180+
.orElse(
181+
isHeap(backendTypeIdentifier)
182+
? ADAPTIVE_HEAP_HIGH_THRESHOLD
183+
: ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
184+
sequencedMultiSetStateConfig
185+
.getAdaptiveLowThresholdOverride()
186+
.orElse(
187+
isHeap(backendTypeIdentifier)
188+
? ADAPTIVE_HEAP_LOW_THRESHOLD
189+
: ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
190+
}
191+
192+
private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
193+
private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
194+
private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
195+
private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;
196+
197+
private static boolean isHeap(String stateBackend) {
198+
String trim = stateBackend.trim();
199+
return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
200+
}
201+
}

0 commit comments

Comments
 (0)