Skip to content

Commit 2fb3bb9

Browse files
committed
[FLINK-38464] Introduce OrderedMultiSetState
1 parent b571ad5 commit 2fb3bb9

File tree

14 files changed

+1964
-0
lines changed

14 files changed

+1964
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.orderedmultisetstate;
20+
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.api.java.tuple.Tuple3;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.util.function.FunctionWithException;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.util.Iterator;
31+
import java.util.Optional;
32+
import java.util.function.Function;
33+
34+
import static org.apache.flink.util.Preconditions.checkArgument;
35+
36+
/**
37+
* An {@link OrderedMultiSetState} that switches dynamically between {@link ValueStateMultiSetState}
38+
* and {@link LinkedMultiSetState} based on the number of elements.
39+
*/
40+
class AdaptiveOrderedMultiSetState implements OrderedMultiSetState<RowData> {
41+
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveOrderedMultiSetState.class);
42+
43+
private final OrderedMultiSetState<RowData> smallState;
44+
private final OrderedMultiSetState<RowData> largeState;
45+
private final long switchToLargeThreshold;
46+
private final long switchToSmallThreshold;
47+
48+
AdaptiveOrderedMultiSetState(
49+
OrderedMultiSetState<RowData> smallState,
50+
OrderedMultiSetState<RowData> largeState,
51+
long switchToLargeThreshold,
52+
long switchToSmallThreshold) {
53+
checkArgument(switchToLargeThreshold > switchToSmallThreshold);
54+
this.smallState = smallState;
55+
this.largeState = largeState;
56+
this.switchToLargeThreshold = switchToLargeThreshold;
57+
this.switchToSmallThreshold = switchToSmallThreshold;
58+
LOG.info(
59+
"Created {} with thresholds: {}=>large, {}=>small",
60+
this.getClass().getSimpleName(),
61+
switchToLargeThreshold,
62+
switchToSmallThreshold);
63+
}
64+
65+
@Override
66+
public SizeChangeInfo add(RowData element, long timestamp) throws Exception {
67+
return execute(state -> state.add(element, timestamp), Function.identity(), "add");
68+
}
69+
70+
@Override
71+
public SizeChangeInfo append(RowData element, long timestamp) throws Exception {
72+
return execute(state -> state.append(element, timestamp), Function.identity(), "append");
73+
}
74+
75+
@Override
76+
public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
77+
if (smallState.isEmpty()) {
78+
return largeState.iterator();
79+
} else {
80+
return smallState.iterator();
81+
}
82+
}
83+
84+
@Override
85+
public boolean isEmpty() throws IOException {
86+
// large state check is faster
87+
return largeState.isEmpty() || smallState.isEmpty();
88+
}
89+
90+
@Override
91+
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData element)
92+
throws Exception {
93+
return execute(state -> state.remove(element), ret -> ret.f2, "remove");
94+
}
95+
96+
@Override
97+
public void clear() {
98+
clearCache();
99+
smallState.clear();
100+
largeState.clear();
101+
}
102+
103+
@Override
104+
public void loadCache() throws IOException {
105+
smallState.loadCache();
106+
largeState.loadCache();
107+
}
108+
109+
@Override
110+
public void clearCache() {
111+
smallState.clearCache();
112+
largeState.clearCache();
113+
}
114+
115+
private <T> T execute(
116+
FunctionWithException<OrderedMultiSetState<RowData>, T, Exception> stateOp,
117+
Function<T, SizeChangeInfo> getSizeChangeInfo,
118+
String action)
119+
throws Exception {
120+
121+
final boolean isUsingLarge = isEmptyCaching(smallState) && !isEmptyCaching(largeState);
122+
123+
// start with small state, i.e. choose smallState when both are empty
124+
OrderedMultiSetState<RowData> currentState = isUsingLarge ? largeState : smallState;
125+
OrderedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;
126+
127+
T result = stateOp.apply(currentState);
128+
SizeChangeInfo sizeInfo = getSizeChangeInfo.apply(result);
129+
130+
final boolean thresholdReached =
131+
isUsingLarge
132+
? sizeInfo.sizeAfter <= switchToSmallThreshold
133+
: sizeInfo.sizeAfter >= switchToLargeThreshold;
134+
135+
if (thresholdReached) {
136+
LOG.debug(
137+
"Switch {} -> {} because '{}' resulted in state size change {} -> {}",
138+
currentState.getClass().getSimpleName(),
139+
otherState.getClass().getSimpleName(),
140+
action,
141+
sizeInfo.sizeBefore,
142+
sizeInfo.sizeAfter);
143+
switchState(currentState, otherState);
144+
}
145+
146+
clearCache();
147+
return result;
148+
}
149+
150+
private boolean isEmptyCaching(OrderedMultiSetState<RowData> state) throws IOException {
151+
state.loadCache();
152+
return state.isEmpty();
153+
}
154+
155+
private void switchState(OrderedMultiSetState<RowData> src, OrderedMultiSetState<RowData> dst)
156+
throws Exception {
157+
Iterator<Tuple2<RowData, Long>> it = src.iterator();
158+
while (it.hasNext()) {
159+
Tuple2<RowData, Long> next = it.next();
160+
dst.append(next.f0, next.f1);
161+
}
162+
src.clear();
163+
}
164+
165+
public static AdaptiveOrderedMultiSetState create(
166+
StateSettings stateSettings,
167+
String backendTypeIdentifier,
168+
OrderedMultiSetState<RowData> smallState,
169+
OrderedMultiSetState<RowData> largeState) {
170+
return new AdaptiveOrderedMultiSetState(
171+
smallState,
172+
largeState,
173+
stateSettings
174+
.getAdaptiveHighThresholdOverride()
175+
.orElse(
176+
isHeap(backendTypeIdentifier)
177+
? ADAPTIVE_HEAP_HIGH_THRESHOLD
178+
: ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
179+
stateSettings
180+
.getAdaptiveLowThresholdOverride()
181+
.orElse(
182+
isHeap(backendTypeIdentifier)
183+
? ADAPTIVE_HEAP_LOW_THRESHOLD
184+
: ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
185+
}
186+
187+
private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
188+
private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
189+
private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
190+
private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;
191+
192+
private static boolean isHeap(String stateBackend) {
193+
String trim = stateBackend.trim();
194+
return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
195+
}
196+
}

0 commit comments

Comments
 (0)