Skip to content

Commit b6df363

Browse files
committed
fixup: remove SizeChangeInfo
1 parent b302de9 commit b6df363

File tree

5 files changed

+51
-111
lines changed

5 files changed

+51
-111
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ class AdaptiveSequencedMultiSetState implements SequencedMultiSetState<RowData>
6565
}
6666

6767
@Override
68-
public SizeChangeInfo add(RowData element, long timestamp) throws Exception {
68+
public long add(RowData element, long timestamp) throws Exception {
6969
return execute(state -> state.add(element, timestamp), Function.identity(), "add");
7070
}
7171

7272
@Override
73-
public SizeChangeInfo append(RowData element, long timestamp) throws Exception {
73+
public long append(RowData element, long timestamp) throws Exception {
7474
return execute(state -> state.append(element, timestamp), Function.identity(), "append");
7575
}
7676

@@ -90,9 +90,9 @@ public boolean isEmpty() throws IOException {
9090
}
9191

9292
@Override
93-
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData element)
93+
public Tuple3<Long, RemovalResultType, Optional<RowData>> remove(RowData element)
9494
throws Exception {
95-
return execute(state -> state.remove(element), ret -> ret.f2, "remove");
95+
return execute(state -> state.remove(element), ret -> ret.f0, "remove");
9696
}
9797

9898
@Override
@@ -116,7 +116,7 @@ public void clearCache() {
116116

117117
private <T> T execute(
118118
FunctionWithException<SequencedMultiSetState<RowData>, T, Exception> stateOp,
119-
Function<T, SizeChangeInfo> getSizeChangeInfo,
119+
Function<T, Long> getNewSize,
120120
String action)
121121
throws Exception {
122122

@@ -127,21 +127,20 @@ private <T> T execute(
127127
SequencedMultiSetState<RowData> otherState = isUsingLarge ? smallState : largeState;
128128

129129
T result = stateOp.apply(currentState);
130-
SizeChangeInfo sizeInfo = getSizeChangeInfo.apply(result);
130+
final long sizeAfter = getNewSize.apply(result);
131131

132132
final boolean thresholdReached =
133133
isUsingLarge
134-
? sizeInfo.sizeAfter <= switchToSmallThreshold
135-
: sizeInfo.sizeAfter >= switchToLargeThreshold;
134+
? sizeAfter <= switchToSmallThreshold
135+
: sizeAfter >= switchToLargeThreshold;
136136

137137
if (thresholdReached) {
138138
LOG.debug(
139-
"Switch {} -> {} because '{}' resulted in state size change {} -> {}",
139+
"Switch {} -> {} because '{}' resulted in state size reaching {} elements",
140140
currentState.getClass().getSimpleName(),
141141
otherState.getClass().getSimpleName(),
142142
action,
143-
sizeInfo.sizeBefore,
144-
sizeInfo.sizeAfter);
143+
sizeAfter);
145144
switchState(currentState, otherState);
146145
}
147146

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import java.util.Iterator;
3030
import java.util.Optional;
3131

32-
import static org.apache.flink.util.Preconditions.checkArgument;
33-
3432
/**
3533
* This class represents an interface for managing an ordered multi-set state in Apache Flink. It
3634
* provides methods to add, append, and remove elements while maintaining insertion order.
@@ -54,11 +52,17 @@ public interface SequencedMultiSetState<T> {
5452
/**
5553
* Add the given element using a normal (non-multi) set semantics: if a matching element exists
5654
* already, replace it (the timestamp is updated).
55+
*
56+
* @return state size (the number of items) after this operation
5757
*/
58-
SizeChangeInfo add(T element, long timestamp) throws Exception;
58+
long add(T element, long timestamp) throws Exception;
5959

60-
/** Add the given element using a multi-set semantics, i.e. append. */
61-
SizeChangeInfo append(T element, long timestamp) throws Exception;
60+
/**
61+
* Add the given element using a multi-set semantics, i.e. append.
62+
*
63+
* @return state size (the number of items) after this operation
64+
*/
65+
long append(T element, long timestamp) throws Exception;
6266

6367
/** Get iterator over all remaining elements and their timestamps, in order of insertion. */
6468
Iterator<Tuple2<T, Long>> iterator() throws Exception;
@@ -69,8 +73,10 @@ public interface SequencedMultiSetState<T> {
6973
/**
7074
* Remove the given element. If there are multiple instances of the same element, remove the
7175
* first one in insertion order.
76+
*
77+
* @return new size, {@link RemovalResultType}, and optionally an item associated with it
7278
*/
73-
Tuple3<RemovalResultType, Optional<T>, SizeChangeInfo> remove(T element) throws Exception;
79+
Tuple3<Long, RemovalResultType, Optional<T>> remove(T element) throws Exception;
7480

7581
/** Clear the state (in the current key context). */
7682
void clear();
@@ -108,51 +114,6 @@ enum Strategy {
108114
ADAPTIVE
109115
}
110116

111-
/**
112-
* Represents the change in size of a multi-set before and after an operation.
113-
*
114-
* <p>This class is used to track the size of the multi-set state before and after a
115-
* modification, such as adding or removing elements.
116-
*
117-
* <p>Fields:
118-
*
119-
* <ul>
120-
* <li>{@code sizeBefore}: The size of the multi-set before the operation.
121-
* <li>{@code sizeAfter}: The size of the multi-set after the operation.
122-
* </ul>
123-
*
124-
* <p>This class is immutable and provides a simple way to encapsulate size change information.
125-
*/
126-
class SizeChangeInfo {
127-
public final long sizeBefore;
128-
public final long sizeAfter;
129-
130-
public SizeChangeInfo(long sizeBefore, long sizeAfter) {
131-
checkArgument(sizeBefore >= 0);
132-
checkArgument(sizeAfter >= 0);
133-
this.sizeBefore = sizeBefore;
134-
this.sizeAfter = sizeAfter;
135-
}
136-
137-
public boolean isEmpty() {
138-
return sizeAfter == 0;
139-
}
140-
141-
public boolean wasLastElement(int index) {
142-
return sizeBefore - 1 == index;
143-
}
144-
145-
@Override
146-
public String toString() {
147-
return "SizeChangeInfo{"
148-
+ "sizeBefore="
149-
+ sizeBefore
150-
+ ", sizeAfter="
151-
+ sizeAfter
152-
+ '}';
153-
}
154-
}
155-
156117
static SequencedMultiSetState<RowData> create(
157118
SequencedMultiSetStateContext parameters,
158119
RuntimeContext ctx,

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,11 @@ public static ValueStateMultiSetState create(
7979
}
8080

8181
@Override
82-
public SizeChangeInfo add(RowData row, long ts) throws Exception {
82+
public long add(RowData row, long ts) throws Exception {
8383
normalizeRowKind(row);
8484
final Tuple2<RowData, Long> toAdd = Tuple2.of(row, timeSelector.getTimestamp(ts));
8585
final RowData key = asKey(row);
8686
final List<Tuple2<RowData, Long>> list = maybeReadState();
87-
final int oldSize = list.size();
8887

8988
int idx = Integer.MIN_VALUE;
9089
int i = 0;
@@ -101,18 +100,16 @@ public SizeChangeInfo add(RowData row, long ts) throws Exception {
101100
list.set(idx, toAdd);
102101
}
103102
valuesState.update(list);
104-
return new SizeChangeInfo(oldSize, list.size());
103+
return list.size();
105104
}
106105

107106
@Override
108-
public SizeChangeInfo append(RowData row, long timestamp) throws Exception {
107+
public long append(RowData row, long timestamp) throws Exception {
109108
normalizeRowKind(row);
110109
List<Tuple2<RowData, Long>> values = maybeReadState();
111-
long sizeBefore = values.size();
112110
values.add(Tuple2.of(row, timeSelector.getTimestamp(timestamp)));
113-
long sizeAfter = values.size();
114111
valuesState.update(values);
115-
return new SizeChangeInfo(sizeBefore, sizeAfter);
112+
return values.size();
116113
}
117114

118115
@Override
@@ -121,8 +118,7 @@ public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
121118
}
122119

123120
@Override
124-
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData row)
125-
throws Exception {
121+
public Tuple3<Long, RemovalResultType, Optional<RowData>> remove(RowData row) throws Exception {
126122
normalizeRowKind(row);
127123
final RowData key = asKey(row);
128124
final List<Tuple2<RowData, Long>> list = maybeReadState();
@@ -147,7 +143,7 @@ public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowDa
147143
} else {
148144
removed = null;
149145
}
150-
return toRemovalResult(new SizeChangeInfo(oldSize, list.size()), dropIdx, removed, last);
146+
return toRemovalResult(oldSize, list.size(), dropIdx, removed, last);
151147
}
152148

153149
@Override
@@ -195,17 +191,16 @@ private static void normalizeRowKind(RowData row) {
195191
row.setRowKind(RowKind.INSERT);
196192
}
197193

198-
private static Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> toRemovalResult(
199-
SizeChangeInfo sizeChangeInfo, int dropIdx, RowData removed, RowData last) {
194+
private static Tuple3<Long, RemovalResultType, Optional<RowData>> toRemovalResult(
195+
long oldSize, long newSize, int dropIdx, RowData removed, RowData last) {
200196
if (dropIdx < 0) {
201-
return Tuple3.of(RemovalResultType.NOTHING_REMOVED, Optional.empty(), sizeChangeInfo);
202-
} else if (sizeChangeInfo.isEmpty()) {
203-
return Tuple3.of(RemovalResultType.ALL_REMOVED, Optional.of(removed), sizeChangeInfo);
204-
} else if (sizeChangeInfo.wasLastElement(dropIdx)) {
205-
return Tuple3.of(
206-
RemovalResultType.REMOVED_LAST_ADDED, Optional.of(last), sizeChangeInfo);
197+
return Tuple3.of(newSize, RemovalResultType.NOTHING_REMOVED, Optional.empty());
198+
} else if (newSize == 0) {
199+
return Tuple3.of(newSize, RemovalResultType.ALL_REMOVED, Optional.of(removed));
200+
} else if (dropIdx == oldSize - 1) {
201+
return Tuple3.of(newSize, RemovalResultType.REMOVED_LAST_ADDED, Optional.of(last));
207202
} else {
208-
return Tuple3.of(RemovalResultType.REMOVED_OTHER, Optional.empty(), sizeChangeInfo);
203+
return Tuple3.of(newSize, RemovalResultType.REMOVED_OTHER, Optional.empty());
209204
}
210205
}
211206
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ public static LinkedMultiSetState create(SequencedMultiSetStateContext p, Runtim
145145

146146
ValueState<MetaSqnInfo> highestSqnState =
147147
ctx.getState(
148-
new ValueStateDescriptor<MetaSqnInfo>(
149-
"highestSqnState", new MetaSqnInfoSerializer()));
148+
new ValueStateDescriptor<>("highestSqnState", new MetaSqnInfoSerializer()));
150149
return new LinkedMultiSetState(
151150
rowToSqnState,
152151
sqnToNodeState,
@@ -157,15 +156,8 @@ public static LinkedMultiSetState create(SequencedMultiSetStateContext p, Runtim
157156
p.config.getTimeSelector());
158157
}
159158

160-
/**
161-
* Add row, replacing any matching existing ones.
162-
*
163-
* @return {@link
164-
* org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.SizeChangeInfo}
165-
* representing the result of the operation
166-
*/
167159
@Override
168-
public SizeChangeInfo add(RowData row, long timestamp) throws Exception {
160+
public long add(RowData row, long timestamp) throws Exception {
169161
final RowDataKey key = toKey(row);
170162
final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value();
171163
final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn;
@@ -209,11 +201,11 @@ public SizeChangeInfo add(RowData row, long timestamp) throws Exception {
209201
sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn));
210202
}
211203
}
212-
return new SizeChangeInfo(oldSize, newSize);
204+
return newSize;
213205
}
214206

215207
@Override
216-
public SizeChangeInfo append(RowData row, long timestamp) throws Exception {
208+
public long append(RowData row, long timestamp) throws Exception {
217209
final RowDataKey key = toKey(row);
218210
final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value();
219211
final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn;
@@ -244,12 +236,11 @@ public SizeChangeInfo append(RowData row, long timestamp) throws Exception {
244236
if (existed) {
245237
sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn));
246238
}
247-
return new SizeChangeInfo(oldSize, newSize);
239+
return newSize;
248240
}
249241

250242
@Override
251-
public Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> remove(RowData row)
252-
throws Exception {
243+
public Tuple3<Long, RemovalResultType, Optional<RowData>> remove(RowData row) throws Exception {
253244
final RowDataKey key = toKey(row);
254245
final RowSqnInfo sqnInfo = rowToSqnState.get(key);
255246
final Long rowSqn = sqnInfo == null ? null : sqnInfo.firstSqn;
@@ -342,12 +333,10 @@ private RowDataKey toKey(RowData row0) {
342333
return RowDataKey.toKey(keyExtractor.apply(row0), keyEqualiser, keyHashFunction);
343334
}
344335

345-
private static Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> toRemovalResult(
336+
private static Tuple3<Long, RemovalResultType, Optional<RowData>> toRemovalResult(
346337
RemovalResultType type, @Nullable RowData row, long oldSize) {
347338
checkArgument(oldSize > 0 || type == NOTHING_REMOVED);
348-
return Tuple3.of(
349-
type,
350-
Optional.ofNullable(row),
351-
new SizeChangeInfo(oldSize, type == NOTHING_REMOVED ? oldSize : oldSize - 1));
339+
long newSize = type == NOTHING_REMOVED ? oldSize : oldSize - 1;
340+
return Tuple3.of(newSize, type, Optional.ofNullable(row));
352341
}
353342
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.apache.flink.table.runtime.generated.HashFunction;
5858
import org.apache.flink.table.runtime.generated.RecordEqualiser;
5959
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.RemovalResultType;
60-
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.SizeChangeInfo;
6160
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy;
6261
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
6362
import org.apache.flink.table.types.logical.LogicalType;
@@ -385,7 +384,7 @@ public void testAdaptivity() throws Exception {
385384
ad.remove(row("key"));
386385
}
387386
assertTrue(ad.isEmpty());
388-
assertTrue(runningSize == 0);
387+
assertEquals(0, runningSize);
389388

390389
for (; runningSize < totalSize; runningSize++) {
391390
assertEquals(
@@ -567,30 +566,27 @@ private static void removeAndAssert(
567566
RemovalResultType expectedResultType,
568567
RowData... expectedReturnedRow)
569568
throws Exception {
570-
Tuple3<RemovalResultType, Optional<RowData>, SizeChangeInfo> ret = state.remove(key);
569+
Tuple3<Long, RemovalResultType, Optional<RowData>> ret = state.remove(key);
571570

572-
RemovalResultType resultType = ret.f0;
573-
Optional<RowData> rowData = ret.f1;
574-
SizeChangeInfo sizeChangeInfo = ret.f2;
571+
long sizeAfter = ret.f0;
572+
RemovalResultType resultType = ret.f1;
573+
Optional<RowData> rowData = ret.f2;
575574

576575
assertEquals(expectedResultType, resultType);
577576
switch (resultType) {
578577
case NOTHING_REMOVED:
579-
assertEquals(sizeChangeInfo.sizeBefore, sizeChangeInfo.sizeAfter);
580578
assertEquals(Optional.empty(), rowData);
581579
break;
582580
case ALL_REMOVED:
583-
assertEquals(0, sizeChangeInfo.sizeAfter);
581+
assertEquals(0, sizeAfter);
584582
assertTrue(state.isEmpty(), "state is expected to be empty");
585583
assertEquals(Optional.of(expectedReturnedRow[0]), rowData);
586584
break;
587585
case REMOVED_OTHER:
588-
assertEquals(sizeChangeInfo.sizeBefore - 1, sizeChangeInfo.sizeAfter);
589586
assertFalse(state.isEmpty(), "state is expected to be non-empty");
590587
assertEquals(Optional.empty(), rowData);
591588
break;
592589
case REMOVED_LAST_ADDED:
593-
assertEquals(sizeChangeInfo.sizeBefore - 1, sizeChangeInfo.sizeAfter);
594590
assertFalse(state.isEmpty(), "state is expected to be non-empty");
595591
assertEquals(Optional.of(expectedReturnedRow[0]), rowData);
596592
break;

0 commit comments

Comments
 (0)