Skip to content

Commit c87cf49

Browse files
committed
[FLINK-38464] Introduce OrderedMultiSetState
1 parent b571ad5 commit c87cf49

File tree

15 files changed

+1990
-0
lines changed

15 files changed

+1990
-0
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.orderedmultisetstate;
20+
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.api.java.tuple.Tuple3;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.table.runtime.orderedmultisetstate.linked.LinkedMultiSetState;
25+
import org.apache.flink.util.function.FunctionWithException;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.io.IOException;
31+
import java.util.Iterator;
32+
import java.util.Optional;
33+
import java.util.function.Function;
34+
35+
import static org.apache.flink.util.Preconditions.checkArgument;
36+
37+
/**
38+
* An {@link OrderedMultiSetState} that switches dynamically between {@link ValueStateMultiSetState}
39+
* and {@link LinkedMultiSetState} based on the number of elements.
40+
*/
41+
class AdaptiveOrderedMultiSetState implements OrderedMultiSetState<RowData> {
42+
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveOrderedMultiSetState.class);
43+
44+
private final OrderedMultiSetState<RowData> smallState;
45+
private final OrderedMultiSetState<RowData> largeState;
46+
private final long switchToLargeThreshold;
47+
private final long switchToSmallThreshold;
48+
49+
AdaptiveOrderedMultiSetState(
50+
OrderedMultiSetState<RowData> smallState,
51+
OrderedMultiSetState<RowData> largeState,
52+
long switchToLargeThreshold,
53+
long switchToSmallThreshold) {
54+
checkArgument(switchToLargeThreshold > switchToSmallThreshold);
55+
this.smallState = smallState;
56+
this.largeState = largeState;
57+
this.switchToLargeThreshold = switchToLargeThreshold;
58+
this.switchToSmallThreshold = switchToSmallThreshold;
59+
LOG.info(
60+
"Created {} with thresholds: {}=>large, {}=>small",
61+
this.getClass().getSimpleName(),
62+
switchToLargeThreshold,
63+
switchToSmallThreshold);
64+
}
65+
66+
@Override
67+
public SizeChangeInfo add(RowData element, long timestamp) throws Exception {
68+
return execute(state -> state.add(element, timestamp), Function.identity(), "add");
69+
}
70+
71+
@Override
72+
public SizeChangeInfo append(RowData element, long timestamp) throws Exception {
73+
return execute(state -> state.append(element, timestamp), Function.identity(), "append");
74+
}
75+
76+
@Override
77+
public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
78+
if (smallState.isEmpty()) {
79+
return largeState.iterator();
80+
} else {
81+
return smallState.iterator();
82+
}
83+
}
84+
85+
@Override
86+
public boolean isEmpty() throws IOException {
87+
// large state check is faster
88+
return largeState.isEmpty() || smallState.isEmpty();
89+
}
90+
91+
@Override
92+
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData element)
93+
throws Exception {
94+
return execute(state -> state.remove(element), ret -> ret.f2, "remove");
95+
}
96+
97+
@Override
98+
public void clear() {
99+
clearCache();
100+
smallState.clear();
101+
largeState.clear();
102+
}
103+
104+
@Override
105+
public void loadCache() throws IOException {
106+
smallState.loadCache();
107+
largeState.loadCache();
108+
}
109+
110+
@Override
111+
public void clearCache() {
112+
smallState.clearCache();
113+
largeState.clearCache();
114+
}
115+
116+
private <T> T execute(
117+
FunctionWithException<OrderedMultiSetState<RowData>, T, Exception> stateOp,
118+
Function<T, SizeChangeInfo> getSizeChangeInfo,
119+
String action)
120+
throws Exception {
121+
122+
final boolean isUsingLarge = isEmptyCaching(smallState) && !isEmptyCaching(largeState);
123+
124+
// start with small state, i.e. choose smallState when both are empty
125+
OrderedMultiSetState<RowData> currentState = isUsingLarge ? largeState : smallState;
126+
OrderedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;
127+
128+
T result = stateOp.apply(currentState);
129+
SizeChangeInfo sizeInfo = getSizeChangeInfo.apply(result);
130+
131+
final boolean thresholdReached =
132+
isUsingLarge
133+
? sizeInfo.sizeAfter <= switchToSmallThreshold
134+
: sizeInfo.sizeAfter >= switchToLargeThreshold;
135+
136+
if (thresholdReached) {
137+
LOG.debug(
138+
"Switch {} -> {} because '{}' resulted in state size change {} -> {}",
139+
currentState.getClass().getSimpleName(),
140+
otherState.getClass().getSimpleName(),
141+
action,
142+
sizeInfo.sizeBefore,
143+
sizeInfo.sizeAfter);
144+
switchState(currentState, otherState);
145+
}
146+
147+
clearCache();
148+
return result;
149+
}
150+
151+
private boolean isEmptyCaching(OrderedMultiSetState<RowData> state) throws IOException {
152+
state.loadCache();
153+
return state.isEmpty();
154+
}
155+
156+
private void switchState(OrderedMultiSetState<RowData> src, OrderedMultiSetState<RowData> dst)
157+
throws Exception {
158+
Iterator<Tuple2<RowData, Long>> it = src.iterator();
159+
while (it.hasNext()) {
160+
Tuple2<RowData, Long> next = it.next();
161+
dst.append(next.f0, next.f1);
162+
}
163+
src.clear();
164+
}
165+
166+
public static AdaptiveOrderedMultiSetState create(
167+
OrderedMultiSetStateConfig orderedMultiSetStateConfig,
168+
String backendTypeIdentifier,
169+
OrderedMultiSetState<RowData> smallState,
170+
OrderedMultiSetState<RowData> largeState) {
171+
return new AdaptiveOrderedMultiSetState(
172+
smallState,
173+
largeState,
174+
orderedMultiSetStateConfig
175+
.getAdaptiveHighThresholdOverride()
176+
.orElse(
177+
isHeap(backendTypeIdentifier)
178+
? ADAPTIVE_HEAP_HIGH_THRESHOLD
179+
: ADAPTIVE_ROCKSDB_HIGH_THRESHOLD),
180+
orderedMultiSetStateConfig
181+
.getAdaptiveLowThresholdOverride()
182+
.orElse(
183+
isHeap(backendTypeIdentifier)
184+
? ADAPTIVE_HEAP_LOW_THRESHOLD
185+
: ADAPTIVE_ROCKSDB_LOW_THRESHOLD));
186+
}
187+
188+
private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400;
189+
private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300;
190+
private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50;
191+
private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40;
192+
193+
private static boolean isHeap(String stateBackend) {
194+
String trim = stateBackend.trim();
195+
return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
196+
}
197+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.orderedmultisetstate;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.annotation.Internal;
23+
import org.apache.flink.api.common.functions.RuntimeContext;
24+
import org.apache.flink.api.java.tuple.Tuple2;
25+
import org.apache.flink.api.java.tuple.Tuple3;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.table.runtime.orderedmultisetstate.linked.LinkedMultiSetState;
28+
29+
import java.io.IOException;
30+
import java.util.Iterator;
31+
import java.util.Optional;
32+
33+
/**
34+
* This class represents an interface for managing an ordered multi-set state in Apache Flink. It
35+
* provides methods to add, append, and remove elements while maintaining insertion order.
36+
*
37+
* <p>The state supports two types of semantics for adding elements:
38+
*
39+
* <ul>
40+
* <li><b>Normal Set Semantics:</b> Replaces an existing matching element with the new one.
41+
* <li><b>Multi-Set Semantics:</b> Appends the new element, allowing duplicates.
42+
* </ul>
43+
*
44+
* <p>Removal operations are supported with different result types, indicating the outcome of the
45+
* removal process, such as whether all elements were removed, the last added element was removed,
46+
* or no elements were removed.
47+
*
48+
* @param <T> The type of elements stored in the state.
49+
*/
50+
@Internal
51+
@Experimental
52+
public interface OrderedMultiSetState<T> {
53+
54+
/**
55+
* Add the given element using a normal (non-multi) set semantics: if a matching element exists
56+
* already, replace it (the timestamp is updated).
57+
*/
58+
SizeChangeInfo add(T element, long timestamp) throws Exception;
59+
60+
/** Add the given element using a multi-set semantics, i.e. append. */
61+
SizeChangeInfo append(T element, long timestamp) throws Exception;
62+
63+
/** Get iterator over all remaining elements and their timestamps, in order of insertion. */
64+
Iterator<Tuple2<T, Long>> iterator() throws Exception;
65+
66+
/** Tells whether any state exists (in the given key context). */
67+
boolean isEmpty() throws IOException;
68+
69+
/**
70+
* Remove the given element. If there are multiple instances of the same element, remove the
71+
* first one in insertion order.
72+
*/
73+
Tuple3<RemovalResultType, Optional<T>, SizeChangeInfo> remove(T element) throws Exception;
74+
75+
/** Clear the state (in the current key context). */
76+
void clear();
77+
78+
/** Load cache. */
79+
void loadCache() throws IOException;
80+
81+
/** Clear caches. */
82+
void clearCache();
83+
84+
/** Removal Result Type. */
85+
enum RemovalResultType {
86+
/**
87+
* Nothing was removed (e.g. as a result of TTL or not matching key), the result will not
88+
* contain any elements.
89+
*/
90+
NOTHING_REMOVED,
91+
/** All elements were removed. The result will contain the last removed element. */
92+
ALL_REMOVED,
93+
/**
94+
* The most recently added element was removed. The result will contain the element added
95+
* before it.
96+
*/
97+
REMOVED_LAST_ADDED,
98+
/**
99+
* An element was removed, it was not the most recently added, there are more elements. The
100+
* result will not contain any elements
101+
*/
102+
REMOVED_OTHER
103+
}
104+
105+
enum Strategy {
106+
VALUE_STATE,
107+
MAP_STATE,
108+
ADAPTIVE
109+
}
110+
111+
/**
112+
* Represents the change in size of a multi-set before and after an operation.
113+
*
114+
* <p>This class is used to track the size of the multi-set state before and after a
115+
* modification, such as adding or removing elements.
116+
*
117+
* <p>Fields:
118+
*
119+
* <ul>
120+
* <li>{@code sizeBefore}: The size of the multi-set before the operation.
121+
* <li>{@code sizeAfter}: The size of the multi-set after the operation.
122+
* </ul>
123+
*
124+
* <p>This class is immutable and provides a simple way to encapsulate size change information.
125+
*/
126+
class SizeChangeInfo {
127+
public final long sizeBefore;
128+
public final long sizeAfter;
129+
130+
public SizeChangeInfo(long sizeBefore, long sizeAfter) {
131+
this.sizeBefore = sizeBefore;
132+
this.sizeAfter = sizeAfter;
133+
}
134+
135+
public boolean wasEmpty() {
136+
return sizeBefore == 0;
137+
}
138+
139+
public boolean isEmpty() {
140+
return sizeAfter == 0;
141+
}
142+
143+
@Override
144+
public String toString() {
145+
return "SizeChangeInfo{"
146+
+ "sizeBefore="
147+
+ sizeBefore
148+
+ ", sizeAfter="
149+
+ sizeAfter
150+
+ '}';
151+
}
152+
}
153+
154+
static OrderedMultiSetState<RowData> create(
155+
OrderedMultiSetStateContext parameters,
156+
RuntimeContext ctx,
157+
String backendTypeIdentifier) {
158+
switch (parameters.config.getStrategy()) {
159+
case MAP_STATE:
160+
return LinkedMultiSetState.create(parameters, ctx);
161+
case VALUE_STATE:
162+
return ValueStateMultiSetState.create(parameters, ctx);
163+
case ADAPTIVE:
164+
return AdaptiveOrderedMultiSetState.create(
165+
parameters.config,
166+
backendTypeIdentifier,
167+
ValueStateMultiSetState.create(parameters, ctx),
168+
LinkedMultiSetState.create(parameters, ctx));
169+
default:
170+
throw new UnsupportedOperationException(parameters.config.getStrategy().name());
171+
}
172+
}
173+
}

0 commit comments

Comments
 (0)