Skip to content

Commit bc1279f

Browse files
committed
[FLINK-38464] Introduce OrderedMultiSetState
1 parent 40e2269 commit bc1279f

23 files changed

+2988
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.sequencedmultisetstate;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.api.java.tuple.Tuple3;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
26+
import org.apache.flink.util.function.FunctionWithException;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.IOException;
32+
import java.util.Iterator;
33+
import java.util.Optional;
34+
import java.util.function.Function;
35+
36+
import static org.apache.flink.util.Preconditions.checkArgument;
37+
38+
/**
39+
* An {@link SequencedMultiSetState} that switches dynamically between {@link
40+
* ValueStateMultiSetState} and {@link LinkedMultiSetState} based on the number of elements.
41+
*/
42+
class AdaptiveSequencedMultiSetState implements SequencedMultiSetState<RowData> {
43+
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSequencedMultiSetState.class);
44+
45+
private final ValueStateMultiSetState smallState;
46+
private final LinkedMultiSetState largeState;
47+
private final long switchToLargeThreshold;
48+
private final long switchToSmallThreshold;
49+
50+
AdaptiveSequencedMultiSetState(
51+
ValueStateMultiSetState smallState,
52+
LinkedMultiSetState largeState,
53+
long switchToLargeThreshold,
54+
long switchToSmallThreshold) {
55+
checkArgument(switchToLargeThreshold > switchToSmallThreshold);
56+
this.smallState = smallState;
57+
this.largeState = largeState;
58+
this.switchToLargeThreshold = switchToLargeThreshold;
59+
this.switchToSmallThreshold = switchToSmallThreshold;
60+
LOG.info(
61+
"Created {} with thresholds: {}=>large, {}=>small",
62+
this.getClass().getSimpleName(),
63+
switchToLargeThreshold,
64+
switchToSmallThreshold);
65+
}
66+
67+
@Override
68+
public long add(RowData element, long timestamp) throws Exception {
69+
return execute(state -> state.add(element, timestamp), Function.identity(), "add");
70+
}
71+
72+
@Override
73+
public long append(RowData element, long timestamp) throws Exception {
74+
return execute(state -> state.append(element, timestamp), Function.identity(), "append");
75+
}
76+
77+
@Override
78+
public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
79+
if (smallState.isEmpty()) {
80+
return largeState.iterator();
81+
} else {
82+
return smallState.iterator();
83+
}
84+
}
85+
86+
@Override
87+
public boolean isEmpty() throws IOException {
88+
// large state check is faster
89+
return largeState.isEmpty() && smallState.isEmpty();
90+
}
91+
92+
@Override
93+
public Tuple3<Long, RemovalResultType, Optional<RowData>> remove(RowData element)
94+
throws Exception {
95+
return execute(state -> state.remove(element), ret -> ret.f0, "remove");
96+
}
97+
98+
@Override
99+
public void clear() {
100+
clearCache();
101+
smallState.clear();
102+
largeState.clear();
103+
}
104+
105+
@Override
106+
public void loadCache() throws IOException {
107+
smallState.loadCache();
108+
largeState.loadCache();
109+
}
110+
111+
@Override
112+
public void clearCache() {
113+
smallState.clearCache();
114+
largeState.clearCache();
115+
}
116+
117+
private <T> T execute(
118+
FunctionWithException<SequencedMultiSetState<RowData>, T, Exception> stateOp,
119+
Function<T, Long> getNewSize,
120+
String action)
121+
throws Exception {
122+
123+
final boolean isUsingLarge = isIsUsingLargeState();
124+
125+
// start with small state, i.e. choose smallState when both are empty
126+
SequencedMultiSetState<RowData> currentState = isUsingLarge ? largeState : smallState;
127+
SequencedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;
128+
129+
T result = stateOp.apply(currentState);
130+
final long sizeAfter = getNewSize.apply(result);
131+
132+
final boolean thresholdReached =
133+
isUsingLarge
134+
? sizeAfter <= switchToSmallThreshold
135+
: sizeAfter >= switchToLargeThreshold;
136+
137+
if (thresholdReached) {
138+
LOG.debug(
139+
"Switch {} -> {} because '{}' resulted in state size reaching {} elements",
140+
currentState.getClass().getSimpleName(),
141+
otherState.getClass().getSimpleName(),
142+
action,
143+
sizeAfter);
144+
switchState(currentState, otherState);
145+
}
146+
147+
clearCache();
148+
return result;
149+
}
150+
151+
@VisibleForTesting
152+
boolean isIsUsingLargeState() throws IOException {
153+
smallState.loadCache();
154+
if (!smallState.isEmpty()) {
155+
return false;
156+
}
157+
largeState.loadCache();
158+
return !largeState.isEmpty();
159+
}
160+
161+
private void switchState(
162+
SequencedMultiSetState<RowData> src, SequencedMultiSetState<RowData> dst)
163+
throws Exception {
164+
Iterator<Tuple2<RowData, Long>> it = src.iterator();
165+
while (it.hasNext()) {
166+
Tuple2<RowData, Long> next = it.next();
167+
dst.append(next.f0, next.f1);
168+
}
169+
src.clear();
170+
}
171+
172+
public static AdaptiveSequencedMultiSetState create(
173+
SequencedMultiSetStateConfig sequencedMultiSetStateConfig,
174+
String backendTypeIdentifier,
175+
ValueStateMultiSetState smallState,
176+
LinkedMultiSetState largeState) {
177+
return new AdaptiveSequencedMultiSetState(
178+
smallState,
179+
largeState,
180+
sequencedMultiSetStateConfig
181+
.getAdaptiveHighThresholdOverride()
182+
.orElse(
183+
isHeap(backendTypeIdentifier)
184+
? ADAPTIVE_HEAP_HIGH_THRESHOLD
185+
: ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
186+
sequencedMultiSetStateConfig
187+
.getAdaptiveLowThresholdOverride()
188+
.orElse(
189+
isHeap(backendTypeIdentifier)
190+
? ADAPTIVE_HEAP_LOW_THRESHOLD
191+
: ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
192+
}
193+
194+
private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
195+
private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
196+
private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
197+
private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;
198+
199+
private static boolean isHeap(String stateBackend) {
200+
String trim = stateBackend.trim();
201+
return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
202+
}
203+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.sequencedmultisetstate;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.functions.RuntimeContext;
23+
import org.apache.flink.api.java.tuple.Tuple2;
24+
import org.apache.flink.api.java.tuple.Tuple3;
25+
import org.apache.flink.table.data.RowData;
26+
import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
27+
28+
import java.io.IOException;
29+
import java.util.Iterator;
30+
import java.util.Optional;
31+
32+
/**
33+
* This class represents an interface for managing an ordered multi-set state in Apache Flink. It
34+
* provides methods to add, append, and remove elements while maintaining insertion order.
35+
*
36+
* <p>The state supports two types of semantics for adding elements:
37+
*
38+
* <ul>
39+
* <li><b>Normal Set Semantics:</b> Replaces an existing matching element with the new one.
40+
* <li><b>Multi-Set Semantics:</b> Appends the new element, allowing duplicates.
41+
* </ul>
42+
*
43+
* <p>Removal operations are supported with different result types, indicating the outcome of the
44+
* removal process, such as whether all elements were removed, the last added element was removed,
45+
* or no elements were removed.
46+
*
47+
* @param <T> The type of elements stored in the state.
48+
*/
49+
@Internal
50+
public interface SequencedMultiSetState<T> {
51+
52+
/**
53+
* Add the given element using a normal (non-multi) set semantics: if a matching element exists
54+
* already, replace it (the timestamp is updated).
55+
*
56+
* @return state size (the number of items) after this operation
57+
*/
58+
long add(T element, long timestamp) throws Exception;
59+
60+
/**
61+
* Add the given element using a multi-set semantics, i.e. append.
62+
*
63+
* @return state size (the number of items) after this operation
64+
*/
65+
long append(T element, long timestamp) throws Exception;
66+
67+
/** Get iterator over all remaining elements and their timestamps, in order of insertion. */
68+
Iterator<Tuple2<T, Long>> iterator() throws Exception;
69+
70+
/** Tells whether any state exists (in the given key context). */
71+
boolean isEmpty() throws IOException;
72+
73+
/**
74+
* Remove the given element. If there are multiple instances of the same element, remove the
75+
* first one in insertion order.
76+
*
77+
* @return new size, {@link RemovalResultType}, and optionally an item associated with it
78+
*/
79+
Tuple3<Long, RemovalResultType, Optional<T>> remove(T element) throws Exception;
80+
81+
/** Clear the state (in the current key context). */
82+
void clear();
83+
84+
/** Load cache. */
85+
void loadCache() throws IOException;
86+
87+
/** Clear caches. */
88+
void clearCache();
89+
90+
/** Removal Result Type. */
91+
enum RemovalResultType {
92+
/**
93+
* Nothing was removed (e.g. as a result of TTL or not matching key), the result will not
94+
* contain any elements.
95+
*/
96+
NOTHING_REMOVED,
97+
/** All elements were removed. The result will contain the last removed element. */
98+
ALL_REMOVED,
99+
/**
100+
* The most recently added element was removed. The result will contain the element added
101+
* before it.
102+
*/
103+
REMOVED_LAST_ADDED,
104+
/**
105+
* An element was removed, it was not the most recently added, there are more elements. The
106+
* result will not contain any elements
107+
*/
108+
REMOVED_OTHER
109+
}
110+
111+
enum Strategy {
112+
VALUE_STATE,
113+
MAP_STATE,
114+
ADAPTIVE
115+
}
116+
117+
static SequencedMultiSetState<RowData> create(
118+
SequencedMultiSetStateContext parameters,
119+
RuntimeContext ctx,
120+
String backendTypeIdentifier) {
121+
switch (parameters.config.getStrategy()) {
122+
case MAP_STATE:
123+
return LinkedMultiSetState.create(parameters, ctx);
124+
case VALUE_STATE:
125+
return ValueStateMultiSetState.create(parameters, ctx);
126+
case ADAPTIVE:
127+
return AdaptiveSequencedMultiSetState.create(
128+
parameters.config,
129+
backendTypeIdentifier,
130+
ValueStateMultiSetState.create(parameters, ctx),
131+
LinkedMultiSetState.create(parameters, ctx));
132+
default:
133+
throw new UnsupportedOperationException(parameters.config.getStrategy().name());
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)