diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index dbcfa7367634a..ea07b604c7d19 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -228,8 +228,15 @@ private PrecomputedParameters( this.stateful = stateful; } - static PrecomputedParameters precompute( + public static PrecomputedParameters precompute( boolean immutableTargetType, TypeSerializer[] fieldSerializers) { + return precompute(immutableTargetType, false, fieldSerializers); + } + + public static PrecomputedParameters precompute( + boolean immutableTargetType, + boolean forceFieldsImmutable, + TypeSerializer[] fieldSerializers) { Preconditions.checkNotNull(fieldSerializers); int totalLength = 0; boolean fieldsImmutable = true; @@ -239,7 +246,7 @@ static PrecomputedParameters precompute( if (fieldSerializer != fieldSerializer.duplicate()) { stateful = true; } - if (!fieldSerializer.isImmutableType()) { + if (!forceFieldsImmutable && !fieldSerializer.isImmutableType()) { fieldsImmutable = false; } if (fieldSerializer.getLength() < 0) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java new file mode 100644 index 0000000000000..8abfa76a8dc64 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/AdaptiveSequencedMultiSetState.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState; +import org.apache.flink.util.function.FunctionWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An {@link SequencedMultiSetState} that switches dynamically between {@link + * ValueStateMultiSetState} and {@link LinkedMultiSetState} based on the number of elements. + */ +class AdaptiveSequencedMultiSetState implements SequencedMultiSetState { + private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSequencedMultiSetState.class); + + private final ValueStateMultiSetState smallState; + private final LinkedMultiSetState largeState; + private final long switchToLargeThreshold; + private final long switchToSmallThreshold; + + AdaptiveSequencedMultiSetState( + ValueStateMultiSetState smallState, + LinkedMultiSetState largeState, + long switchToLargeThreshold, + long switchToSmallThreshold) { + checkArgument(switchToLargeThreshold > switchToSmallThreshold); + this.smallState = smallState; + this.largeState = largeState; + this.switchToLargeThreshold = switchToLargeThreshold; + this.switchToSmallThreshold = switchToSmallThreshold; + LOG.info( + "Created {} with thresholds: {}=>large, {}=>small", + this.getClass().getSimpleName(), + switchToLargeThreshold, + switchToSmallThreshold); + } + + @Override + public StateChangeInfo add(RowData element, long timestamp) throws Exception { + return execute( + state -> state.add(element, timestamp), StateChangeInfo::getSizeAfter, "add"); + } + + @Override + public StateChangeInfo append(RowData element, long timestamp) throws Exception { + return execute( + state -> state.append(element, timestamp), StateChangeInfo::getSizeAfter, "append"); + } + + @Override + public Iterator> iterator() throws Exception { + if (smallState.isEmpty()) { + return largeState.iterator(); + } else { + return smallState.iterator(); + } + } + + @Override + public boolean isEmpty() throws IOException { + // large state check is faster + return largeState.isEmpty() && smallState.isEmpty(); + } + + public StateChangeInfo remove(RowData element) throws Exception { + return execute(state -> state.remove(element), StateChangeInfo::getSizeAfter, "remove"); + } + + @Override + public void clear() { + clearCache(); + smallState.clear(); + largeState.clear(); + } + + @Override + public void loadCache() throws IOException { + smallState.loadCache(); + largeState.loadCache(); + } + + @Override + public void clearCache() { + smallState.clearCache(); + largeState.clearCache(); + } + + private T execute( + FunctionWithException, T, Exception> stateOp, + Function getNewSize, + String action) + throws Exception { + + final boolean isUsingLarge = isIsUsingLargeState(); + + // start with small state, i.e. choose smallState when both are empty + SequencedMultiSetState currentState = isUsingLarge ? largeState : smallState; + SequencedMultiSetState otherState = isUsingLarge ? smallState : largeState; + + T result = stateOp.apply(currentState); + final long sizeAfter = getNewSize.apply(result); + + final boolean thresholdReached = + isUsingLarge + ? sizeAfter <= switchToSmallThreshold + : sizeAfter >= switchToLargeThreshold; + + if (thresholdReached) { + LOG.debug( + "Switch {} -> {} because '{}' resulted in state size reaching {} elements", + currentState.getClass().getSimpleName(), + otherState.getClass().getSimpleName(), + action, + sizeAfter); + switchState(currentState, otherState); + } + + clearCache(); + return result; + } + + @VisibleForTesting + boolean isIsUsingLargeState() throws IOException { + smallState.loadCache(); + if (!smallState.isEmpty()) { + return false; + } + largeState.loadCache(); + return !largeState.isEmpty(); + } + + private void switchState( + SequencedMultiSetState src, SequencedMultiSetState dst) + throws Exception { + Iterator> it = src.iterator(); + while (it.hasNext()) { + Tuple2 next = it.next(); + dst.append(next.f0, next.f1); + } + src.clear(); + } + + public static AdaptiveSequencedMultiSetState create( + SequencedMultiSetStateConfig sequencedMultiSetStateConfig, + String backendTypeIdentifier, + ValueStateMultiSetState smallState, + LinkedMultiSetState largeState) { + return new AdaptiveSequencedMultiSetState( + smallState, + largeState, + sequencedMultiSetStateConfig + .getAdaptiveHighThresholdOverride() + .orElse( + isHeap(backendTypeIdentifier) + ? ADAPTIVE_HEAP_HIGH_THRESHOLD + : ADAPTIVE_ROCKSDB_HIGH_THRESHOLD), + sequencedMultiSetStateConfig + .getAdaptiveLowThresholdOverride() + .orElse( + isHeap(backendTypeIdentifier) + ? ADAPTIVE_HEAP_LOW_THRESHOLD + : ADAPTIVE_ROCKSDB_LOW_THRESHOLD)); + } + + private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400; + private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300; + private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50; + private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40; + + private static boolean isHeap(String stateBackend) { + String trim = stateBackend.trim(); + return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap"); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java new file mode 100644 index 0000000000000..5f3140b6d24f3 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetState.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents an interface for managing an ordered multi-set state in Apache Flink. It + * provides methods to add, append, and remove elements while maintaining insertion order. + * + *

The state supports two types of semantics for adding elements: + * + *

    + *
  • Normal Set Semantics: Replaces an existing matching element with the new one. + *
  • Multi-Set Semantics: Appends the new element, allowing duplicates. + *
+ * + *

Removal operations are supported with different result types, indicating the outcome of the + * removal process, such as whether all elements were removed, the last added element was removed, + * or no elements were removed. + * + * @param The type of elements stored in the state. + */ +@Internal +public interface SequencedMultiSetState { + + /** + * Add the given element using the normal (non-multi) set semantics: if a matching element + * exists already, replace it (the timestamp is updated). + */ + StateChangeInfo add(T element, long timestamp) throws Exception; + + /** Add the given element using the multi-set semantics, i.e. append. */ + StateChangeInfo append(T element, long timestamp) throws Exception; + + /** + * Remove the given element. If there are multiple instances of the same element, remove the + * first one in insertion order. + */ + StateChangeInfo remove(T element) throws Exception; + + /** Represents a result of a state changing operation. */ + class StateChangeInfo { + private final StateChangeType changeType; + private final long sizeBefore; + private final long sizeAfter; + @Nullable private final T payload; // depends on the change type + + public static StateChangeInfo forAddition(long sizeBefore, long sizeAfter) { + return new StateChangeInfo<>(sizeBefore, sizeAfter, StateChangeType.ADDITION, null); + } + + public static StateChangeInfo forRemovedLastAdded( + long sizeBefore, long sizeAfter, T payload) { + return new StateChangeInfo<>( + sizeBefore, sizeAfter, StateChangeType.REMOVAL_LAST_ADDED, payload); + } + + public static StateChangeInfo forRemovedOther(long sizeBefore, long sizeAfter) { + return new StateChangeInfo<>( + sizeBefore, sizeAfter, StateChangeType.REMOVAL_OTHER, null); + } + + public static StateChangeInfo forAllRemoved( + long sizeBefore, long sizeAfter, T payload) { + return new StateChangeInfo<>( + sizeBefore, sizeAfter, StateChangeType.REMOVAL_ALL, payload); + } + + public static StateChangeInfo forRemovalNotFound(long size) { + return new StateChangeInfo<>(size, size, StateChangeType.REMOVAL_NOT_FOUND, null); + } + + private StateChangeInfo( + long sizeBefore, long sizeAfter, StateChangeType changeType, @Nullable T payload) { + changeType.validate(sizeBefore, sizeAfter, payload); + this.sizeBefore = sizeBefore; + this.sizeAfter = sizeAfter; + this.changeType = changeType; + this.payload = payload; + } + + public long getSizeAfter() { + return sizeAfter; + } + + public boolean wasEmpty() { + return sizeBefore == 0; + } + + public StateChangeType getChangeType() { + return changeType; + } + + /** The payload depends on the {@link StateChangeType}. */ + public Optional getPayload() { + return Optional.ofNullable(payload); + } + } + + /** Get iterator over all remaining elements and their timestamps, in order of insertion. */ + Iterator> iterator() throws Exception; + + /** Tells whether any state exists (in the given key context). */ + boolean isEmpty() throws IOException; + + /** Clear the state (in the current key context). */ + void clear(); + + /** Load cache. */ + void loadCache() throws IOException; + + /** Clear caches. */ + void clearCache(); + + /** Removal Result Type. */ + enum StateChangeType { + /** + * An element was added or appended to the state. The result will not contain any elements. + */ + ADDITION { + @Override + public void validate(long sizeBefore, long sizeAfter, T payload) { + checkArgument(sizeAfter == sizeBefore + 1 || sizeAfter == sizeBefore); + } + }, + /** + * Nothing was removed (e.g. as a result of TTL or not matching key), the result will not + * contain any elements. + */ + REMOVAL_NOT_FOUND { + @Override + public void validate(long sizeBefore, long sizeAfter, T payload) { + checkArgument(sizeAfter == sizeBefore); + checkArgument(payload == null); + } + }, + /** All elements were removed. The result will contain the last removed element. */ + REMOVAL_ALL { + @Override + public void validate(long sizeBefore, long sizeAfter, T payload) { + checkArgument(sizeBefore > 0); + checkArgument(sizeAfter == 0); + checkNotNull(payload); + } + }, + /** + * The most recently added element was removed. The result will contain the element added + * before it. + */ + REMOVAL_LAST_ADDED { + @Override + public void validate(long sizeBefore, long sizeAfter, T payload) { + checkArgument(sizeAfter == sizeBefore - 1); + checkNotNull(payload); + } + }, + /** + * An element was removed, it was not the most recently added, there are more elements. The + * result will not contain any elements + */ + REMOVAL_OTHER { + @Override + public void validate(long sizeBefore, long sizeAfter, T payload) { + checkArgument(sizeAfter == sizeBefore - 1); + checkArgument(payload == null); + } + }; + + public abstract void validate(long sizeBefore, long sizeAfter, T payload); + } + + enum Strategy { + VALUE_STATE, + MAP_STATE, + ADAPTIVE + } + + static SequencedMultiSetState create( + SequencedMultiSetStateContext parameters, + RuntimeContext ctx, + String backendTypeIdentifier) { + switch (parameters.config.getStrategy()) { + case MAP_STATE: + return LinkedMultiSetState.create(parameters, ctx); + case VALUE_STATE: + return ValueStateMultiSetState.create(parameters, ctx); + case ADAPTIVE: + return AdaptiveSequencedMultiSetState.create( + parameters.config, + backendTypeIdentifier, + ValueStateMultiSetState.create(parameters, ctx), + LinkedMultiSetState.create(parameters, ctx)); + default: + throw new UnsupportedOperationException(parameters.config.getStrategy().name()); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java new file mode 100644 index 0000000000000..a6b16c343d860 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.streaming.api.TimeDomain; + +import javax.annotation.Nullable; + +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Configuration for {@link SequencedMultiSetState}. */ +public class SequencedMultiSetStateConfig { + + private final SequencedMultiSetState.Strategy strategy; + private final @Nullable Long adaptiveHighThresholdOverride; + private final @Nullable Long adaptiveLowThresholdOverride; + private final StateTtlConfig ttlConfig; + private final TimeSelector ttlTimeSelector; + + public SequencedMultiSetStateConfig( + SequencedMultiSetState.Strategy strategy, + @Nullable Long adaptiveHighThresholdOverride, + @Nullable Long adaptiveLowThresholdOverride, + StateTtlConfig ttlConfig, + TimeDomain ttlTimeDomain) { + this( + strategy, + adaptiveHighThresholdOverride, + adaptiveLowThresholdOverride, + ttlConfig, + TimeSelector.getTimeDomain(ttlTimeDomain)); + } + + public SequencedMultiSetStateConfig( + SequencedMultiSetState.Strategy strategy, + @Nullable Long adaptiveHighThresholdOverride, + @Nullable Long adaptiveLowThresholdOverride, + StateTtlConfig ttlConfig, + TimeSelector ttlTimeSelector) { + checkArgument( + !ttlConfig.isEnabled(), + "TTL is not supported"); // https://issues.apache.org/jira/browse/FLINK-38463 + this.strategy = strategy; + this.adaptiveHighThresholdOverride = adaptiveHighThresholdOverride; + this.adaptiveLowThresholdOverride = adaptiveLowThresholdOverride; + this.ttlConfig = ttlConfig; + this.ttlTimeSelector = ttlTimeSelector; + } + + public static SequencedMultiSetStateConfig defaults( + TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) { + return forValue(ttlTimeDomain, ttlConfig); + } + + public static SequencedMultiSetStateConfig forMap( + TimeDomain ttlTimeDomain, StateTtlConfig ttlConfig) { + return new SequencedMultiSetStateConfig( + SequencedMultiSetState.Strategy.MAP_STATE, null, null, ttlConfig, ttlTimeDomain); + } + + public static SequencedMultiSetStateConfig forValue( + TimeDomain ttlTimeDomain, StateTtlConfig ttl) { + return new SequencedMultiSetStateConfig( + SequencedMultiSetState.Strategy.VALUE_STATE, null, null, ttl, ttlTimeDomain); + } + + public static SequencedMultiSetStateConfig adaptive( + TimeDomain ttlTimeDomain, + long adaptiveHighThresholdOverride, + long adaptiveLowThresholdOverride, + StateTtlConfig ttl) { + return new SequencedMultiSetStateConfig( + SequencedMultiSetState.Strategy.ADAPTIVE, + adaptiveHighThresholdOverride, + adaptiveLowThresholdOverride, + ttl, + ttlTimeDomain); + } + + public TimeSelector getTimeSelector() { + return ttlTimeSelector; + } + + public SequencedMultiSetState.Strategy getStrategy() { + return strategy; + } + + public Optional getAdaptiveHighThresholdOverride() { + return Optional.ofNullable(adaptiveHighThresholdOverride); + } + + public Optional getAdaptiveLowThresholdOverride() { + return Optional.ofNullable(adaptiveLowThresholdOverride); + } + + public StateTtlConfig getTtlConfig() { + return ttlConfig; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java new file mode 100644 index 0000000000000..b7412d2abcd98 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateContext.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedHashFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; + +import java.util.function.Function; + +/** {@link SequencedMultiSetState} (creation) context. */ +public class SequencedMultiSetStateContext { + + public final SequencedMultiSetStateConfig config; + public final TypeSerializer keySerializer; + public final GeneratedRecordEqualiser generatedKeyEqualiser; + public final GeneratedHashFunction generatedKeyHashFunction; + public final TypeSerializer recordSerializer; + public final Function keyExtractor; + + public SequencedMultiSetStateContext( + TypeSerializer keySerializer, + GeneratedRecordEqualiser generatedKeyEqualiser, + GeneratedHashFunction generatedKeyHashFunction, + TypeSerializer recordSerializer, + Function keyExtractor, + SequencedMultiSetStateConfig config) { + this.keySerializer = keySerializer; + this.generatedKeyEqualiser = generatedKeyEqualiser; + this.generatedKeyHashFunction = generatedKeyHashFunction; + this.recordSerializer = recordSerializer; + this.keyExtractor = keyExtractor; + this.config = config; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java new file mode 100644 index 0000000000000..40fe6f456e601 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/TimeSelector.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.util.clock.SystemClock; + +@Internal +@FunctionalInterface +public interface TimeSelector { + + long getTimestamp(long elementTimestamp); + + static TimeSelector getTimeDomain(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME: + return elementTimestamp -> elementTimestamp; + case PROCESSING_TIME: + return elementTimestamp -> SystemClock.getInstance().absoluteTimeMillis(); + default: + throw new IllegalStateException("unknown time domain: " + timeDomain); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java new file mode 100644 index 0000000000000..3bc88dc67e38e --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/ValueStateMultiSetState.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.types.RowKind; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +/** + * Simple implementation of {@link SequencedMultiSetState} based on plain {@code ValueState}. + */ +class ValueStateMultiSetState implements SequencedMultiSetState { + + private final ValueState>> valuesState; + private final RecordEqualiser keyEqualiser; + private final Function keyExtractor; + private final TimeSelector timeSelector; + private List> cache; + + ValueStateMultiSetState( + ValueState>> valuesState, + RecordEqualiser keyEqualiser, + Function keyExtractor, + TimeSelector timeSelector) { + this.valuesState = valuesState; + this.keyEqualiser = keyEqualiser; + this.keyExtractor = keyExtractor; + this.timeSelector = timeSelector; + } + + public static ValueStateMultiSetState create( + SequencedMultiSetStateContext p, RuntimeContext ctx) { + //noinspection rawtypes,unchecked + return new ValueStateMultiSetState( + ctx.getState( + new ValueStateDescriptor<>( + "list", + new ListSerializer<>( + new TupleSerializer( + Tuple2.class, + new TypeSerializer[] { + p.recordSerializer, LongSerializer.INSTANCE + })))), + p.generatedKeyEqualiser.newInstance(ctx.getUserCodeClassLoader()), + p.keyExtractor, + p.config.getTimeSelector()); + } + + @Override + public StateChangeInfo add(RowData row, long ts) throws Exception { + normalizeRowKind(row); + final Tuple2 toAdd = Tuple2.of(row, timeSelector.getTimestamp(ts)); + final RowData key = asKey(row); + final List> list = maybeReadState(); + final long oldSize = list.size(); + + int idx = Integer.MIN_VALUE; + int i = 0; + for (Tuple2 t : list) { + if (keyEqualiser.equals(asKey(t.f0), key)) { + idx = i; + break; + } + i++; + } + if (idx < 0) { + list.add(toAdd); + } else { + list.set(idx, toAdd); + } + valuesState.update(list); + return StateChangeInfo.forAddition(oldSize, list.size()); + } + + @Override + public StateChangeInfo append(RowData row, long timestamp) throws Exception { + normalizeRowKind(row); + List> values = maybeReadState(); + final long oldSize = values.size(); + values.add(Tuple2.of(row, timeSelector.getTimestamp(timestamp))); + valuesState.update(values); + return StateChangeInfo.forAddition(oldSize, values.size()); + } + + @Override + public Iterator> iterator() throws Exception { + return maybeReadState().iterator(); + } + + @Override + public StateChangeInfo remove(RowData row) throws Exception { + normalizeRowKind(row); + final RowData key = asKey(row); + final List> list = maybeReadState(); + final int oldSize = list.size(); + + int dropIdx = Integer.MIN_VALUE; + RowData last = null; + int i = 0; + for (Tuple2 t : list) { + if (keyEqualiser.equals(key, asKey(t.f0))) { + dropIdx = i; + break; + } else { + last = t.f0; + } + i++; + } + final RowData removed; + if (dropIdx >= 0) { + list.remove(dropIdx); + removed = row; + valuesState.update(list); + } else { + removed = null; + } + return toRemovalResult(oldSize, list.size(), dropIdx, removed, last); + } + + @Override + public void loadCache() throws IOException { + cache = readState(); + } + + @Override + public void clearCache() { + cache = null; + } + + private List> maybeReadState() throws IOException { + if (cache != null) { + return cache; + } + return readState(); + } + + private List> readState() throws IOException { + List> value = valuesState.value(); + if (value == null) { + value = new ArrayList<>(); + } + return value; + } + + @Override + public void clear() { + clearCache(); + valuesState.clear(); + } + + @Override + public boolean isEmpty() throws IOException { + List> list = cache == null ? valuesState.value() : cache; + return list == null || list.isEmpty(); + } + + private RowData asKey(RowData row) { + return keyExtractor.apply(row); + } + + private static void normalizeRowKind(RowData row) { + row.setRowKind(RowKind.INSERT); + } + + private static StateChangeInfo toRemovalResult( + long oldSize, long newSize, int dropIdx, RowData removed, RowData last) { + if (dropIdx < 0) { + return StateChangeInfo.forRemovalNotFound(oldSize); + } else if (newSize == 0) { + return StateChangeInfo.forAllRemoved(oldSize, newSize, removed); + } else if (dropIdx == oldSize - 1) { + return StateChangeInfo.forRemovedLastAdded(oldSize, newSize, last); + } else { + return StateChangeInfo.forRemovedOther(oldSize, newSize); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java new file mode 100644 index 0000000000000..840fe007ebf84 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState; +import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext; +import org.apache.flink.table.runtime.sequencedmultisetstate.TimeSelector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class implements an ordered multi-set state backend using Flink's state primitives. It + * maintains the insertion order of elements and supports operations such as adding, appending, and + * removing elements. The state is backed by Flink's `MapState` and `ValueState` to store and manage + * the relationships between rows and sequence numbers (SQNs). + * + *

Key features of this state implementation: + * + *

    + *
  • Maintains insertion order of elements using a doubly-linked list structure. + *
  • Supports both normal set semantics (replacing existing elements) and multi-set semantics + * (allowing duplicates). + *
  • Efficiently tracks the highest sequence number and links between elements for fast + * traversal and updates. + *
  • Provides methods to add, append, and remove elements with appropriate handling of state + * transitions. + *
+ * + *

Note: This implementation is marked as {@code @Internal} and is intended for internal use + * within Flink. It may be subject to changes in future versions. + * + *

Usage: + * + *

    + *
  • Use the {@link #add(RowData, long)} method to add an element, replacing any existing + * matching element. + *
  • Use the {@link #append(RowData, long)} method to append an element, allowing duplicates. + *
  • Use the {@link #remove(RowData)} method to remove an element, with detailed removal result + * types. + *
+ * + * @see SequencedMultiSetState + * @see org.apache.flink.api.common.state.MapState + * @see org.apache.flink.api.common.state.ValueState + */ +@Internal +public class LinkedMultiSetState implements SequencedMultiSetState { + + // maps rows to SQNs (first and last SQN for a row (same in case of upsert key)) + private final MapState rowToSqnState; + // maps SQNs to Nodes, which comprise a doubly-linked list + private final MapState sqnToNodeState; + // highest sequence number; also latest emitted downstream + private final ValueState highestSqnAndSizeState; + + private final RecordEqualiser keyEqualiser; + private final HashFunction keyHashFunction; + private final Function keyExtractor; + private final TimeSelector timeSelector; + + private LinkedMultiSetState( + MapState rowToSqnState, + MapState sqnToNodeState, + ValueState highestSqnAndSizeState, + RecordEqualiser keyEqualiser, + HashFunction keyHashFunction, + Function keyExtractor, + TimeSelector timeSelector) { + this.rowToSqnState = checkNotNull(rowToSqnState); + this.sqnToNodeState = checkNotNull(sqnToNodeState); + this.highestSqnAndSizeState = checkNotNull(highestSqnAndSizeState); + this.keyEqualiser = checkNotNull(keyEqualiser); + this.keyHashFunction = checkNotNull(keyHashFunction); + this.keyExtractor = keyExtractor; + this.timeSelector = timeSelector; + } + + public static LinkedMultiSetState create(SequencedMultiSetStateContext p, RuntimeContext ctx) { + + RecordEqualiser keyEqualiser = + p.generatedKeyEqualiser.newInstance(ctx.getUserCodeClassLoader()); + HashFunction keyHashFunction = + p.generatedKeyHashFunction.newInstance(ctx.getUserCodeClassLoader()); + + MapState rowToSqnState = + ctx.getMapState( + new MapStateDescriptor<>( + "rowToSqnState", + new RowDataKeySerializer( + p.keySerializer, + keyEqualiser, + keyHashFunction, + p.generatedKeyEqualiser, + p.generatedKeyHashFunction), + new RowSqnInfoSerializer())); + MapState sqnToNodeState = + ctx.getMapState( + new MapStateDescriptor<>( + "sqnToNodeState", + LongSerializer.INSTANCE, + new NodeSerializer(p.recordSerializer))); + + ValueState highestSqnState = + ctx.getState( + new ValueStateDescriptor<>("highestSqnState", new MetaSqnInfoSerializer())); + return new LinkedMultiSetState( + rowToSqnState, + sqnToNodeState, + highestSqnState, + keyEqualiser, + keyHashFunction, + p.keyExtractor, + p.config.getTimeSelector()); + } + + @Override + public StateChangeInfo add(RowData row, long timestamp) throws Exception { + final RowDataKey key = toKey(row); + final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value(); + final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn; + final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size; + final RowSqnInfo rowSqnInfo = rowToSqnState.get(key); + final Long rowSqn = rowSqnInfo == null ? null : rowToSqnState.get(key).firstSqn; + final boolean isNewRowKey = rowSqn == null; // it's a 1st such record 'row' + final boolean isNewContextKey = highSqn == null; // 1st a record for current context key + + final Long oldSqn; + final long newSqn; + final long newSize; + + if (isNewContextKey && isNewRowKey) { + // no state at all for this context key + oldSqn = null; + newSqn = 0; + newSize = 1; + } else if (isNewRowKey) { + // add new rowKey "to the end" + oldSqn = null; + newSqn = highSqn + 1; + newSize = oldSize + 1; + } else { + // replace an existing row by rowKey + oldSqn = newSqn = rowSqn; + newSize = oldSize; + } + + timestamp = timeSelector.getTimestamp(timestamp); + + sqnToNodeState.put( + newSqn, + isNewRowKey + ? new Node(row, newSqn, highSqn, null, null, timestamp) + : sqnToNodeState.get(oldSqn).withRow(row, timestamp)); + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); + if (isNewRowKey) { + rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); + if (!isNewContextKey) { + sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); + } + } + return StateChangeInfo.forAddition(oldSize, newSize); + } + + @Override + public StateChangeInfo append(RowData row, long timestamp) throws Exception { + final RowDataKey key = toKey(row); + final MetaSqnInfo highSqnAndSize = highestSqnAndSizeState.value(); + final Long highSqn = highSqnAndSize == null ? null : highSqnAndSize.highSqn; + final long oldSize = highSqnAndSize == null ? 0 : highSqnAndSize.size; + final boolean existed = highSqn != null; + final long newSqn = (existed ? highSqn + 1 : 0); + final Node newNode = + new Node( + row, + newSqn, + highSqn, /*next*/ + null, /*nextForRecord*/ + null, + timeSelector.getTimestamp(timestamp)); + final long newSize = oldSize + 1; + + final RowSqnInfo sqnInfo = existed ? rowToSqnState.get(key) : null; + final Long rowSqn = sqnInfo == null ? null : sqnInfo.firstSqn; + if (rowSqn == null) { + rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); + } else { + rowToSqnState.put(key, RowSqnInfo.of(rowSqn, newSqn)); + sqnToNodeState.put( + sqnInfo.lastSqn, sqnToNodeState.get(sqnInfo.lastSqn).withNextForRecord(newSqn)); + } + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); + sqnToNodeState.put(newSqn, newNode); + if (existed) { + sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); + } + return StateChangeInfo.forAddition(oldSize, newSize); + } + + public StateChangeInfo remove(RowData row) throws Exception { + final RowDataKey key = toKey(row); + final RowSqnInfo sqnInfo = rowToSqnState.get(key); + final Long rowSqn = sqnInfo == null ? null : sqnInfo.firstSqn; + final MetaSqnInfo highSqnStateAndSize = highestSqnAndSizeState.value(); + final long oldSize = highSqnStateAndSize == null ? 0L : highSqnStateAndSize.size; + if (rowSqn == null) { + return StateChangeInfo.forRemovalNotFound(oldSize); + } + final Node node = sqnToNodeState.get(rowSqn); + + final Node prev = removeNode(node, key, highSqnStateAndSize); + + if (!node.isHighestSqn()) { + return StateChangeInfo.forRemovedOther(oldSize, oldSize - 1); + } else if (prev == null) { + return StateChangeInfo.forAllRemoved(oldSize, oldSize - 1, row); + } else { + return StateChangeInfo.forRemovedLastAdded(oldSize, oldSize - 1, prev.row); + } + } + + @Override + public void clear() { + clearCache(); + sqnToNodeState.clear(); + highestSqnAndSizeState.clear(); + rowToSqnState.clear(); + } + + @Override + public void loadCache() {} + + @Override + public void clearCache() {} + + private Node removeNode(Node node, RowDataKey key, MetaSqnInfo highSqnStateAndSize) + throws Exception { + + if (node.isLowestSqn() && node.isHighestSqn()) { + // fast track: if last record for PK then cleanup everything and return + clear(); + return null; + } + + sqnToNodeState.remove(node.getSqn()); + highestSqnAndSizeState.update( + MetaSqnInfo.of( + node.isHighestSqn() ? node.prevSqn : highSqnStateAndSize.highSqn, + highSqnStateAndSize.size - 1)); + if (node.isLastForRecord()) { + rowToSqnState.remove(key); + } else { + rowToSqnState.put(key, rowToSqnState.get(key).withFirstSqn(node.nextSqnForRecord)); + } + // link prev node to next + Node prev = null; + if (node.hasPrev()) { + prev = sqnToNodeState.get(node.prevSqn).withNext(node.nextSqn); + sqnToNodeState.put(node.prevSqn, prev); + } + // link next node to prev + if (node.hasNext()) { + sqnToNodeState.put( + node.nextSqn, sqnToNodeState.get(node.nextSqn).withPrev(node.prevSqn)); + } + return prev; + } + + @Override + public Iterator> iterator() throws Exception { + // this can be implemented more efficiently + // however, the expected use case is to migrate all the values either to or from the memory + // state backend, so loading all into memory seems fine + List list = new ArrayList<>(); + for (Node node : sqnToNodeState.values()) { + list.add(node); + } + list.sort(Comparator.comparingLong(Node::getSqn)); + return list.stream().map(node -> Tuple2.of(node.row, node.timestamp)).iterator(); + } + + @Override + public boolean isEmpty() throws IOException { + return highestSqnAndSizeState.value() == null; + } + + private RowDataKey toKey(RowData row0) { + return RowDataKey.toKey(keyExtractor.apply(row0), keyEqualiser, keyHashFunction); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java new file mode 100644 index 0000000000000..a7354872ece75 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfo.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +/** Stores first and last SQN for a record. */ +class MetaSqnInfo { + public final long highSqn; + public final long size; + + public MetaSqnInfo(long highSqn, long size) { + Preconditions.checkArgument(size >= 0); + this.highSqn = highSqn; + this.size = size; + } + + public static MetaSqnInfo of(long first, long last) { + return new MetaSqnInfo(first, last); + } + + @Override + public String toString() { + return "MetaSqnInfo{" + "firstSqn=" + highSqn + ", lastSqn=" + size + '}'; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MetaSqnInfo)) { + return false; + } + MetaSqnInfo that = (MetaSqnInfo) o; + return highSqn == that.highSqn && size == that.size; + } + + @Override + public int hashCode() { + return Objects.hash(highSqn, size); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java new file mode 100644 index 0000000000000..4819d6d4dfdbf --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; + +@SuppressWarnings("ClassEscapesDefinedScope") +public class MetaSqnInfoSerializer extends CompositeSerializer { + + public MetaSqnInfoSerializer() { + this(null, LongSerializer.INSTANCE, LongSerializer.INSTANCE); + } + + protected MetaSqnInfoSerializer( + PrecomputedParameters precomputed, TypeSerializer... fieldSerializers) { + super( + PrecomputedParameters.precompute( + true, true, (TypeSerializer[]) fieldSerializers), + fieldSerializers); + } + + @Override + public MetaSqnInfo createInstance(Object... values) { + return new MetaSqnInfo((Long) values[0], (Long) values[1]); + } + + @Override + protected void setField(MetaSqnInfo sqnInfo, int index, Object fieldValue) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getField(MetaSqnInfo value, int index) { + switch (index) { + case 0: + return value.highSqn; + case 1: + return value.size; + default: + throw new IllegalArgumentException("invalid index: " + index); + } + } + + @Override + protected CompositeSerializer createSerializerInstance( + PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { + return new MetaSqnInfoSerializer(precomputed, originalSerializers); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new MetaSqnInfoSerializerSnapshot(this); + } + + public static class MetaSqnInfoSerializerSnapshot + extends CompositeTypeSerializerSnapshot { + + @SuppressWarnings("unused") + public MetaSqnInfoSerializerSnapshot() {} + + MetaSqnInfoSerializerSnapshot(MetaSqnInfoSerializer serializer) { + super(serializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return 0; + } + + @Override + protected TypeSerializer[] getNestedSerializers(MetaSqnInfoSerializer outerSerializer) { + return new TypeSerializer[] {LongSerializer.INSTANCE, LongSerializer.INSTANCE}; + } + + @Override + protected MetaSqnInfoSerializer createOuterSerializerWithNestedSerializers( + TypeSerializer[] nestedSerializers) { + return new MetaSqnInfoSerializer(null, nestedSerializers); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java new file mode 100644 index 0000000000000..232437ab310ca --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/Node.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.table.data.RowData; + +import java.util.Objects; + +class Node { + final RowData row; + private final long sqn; + final Long prevSqn; + final Long nextSqn; + final Long nextSqnForRecord; + final Long timestamp; // for future TTL support + + Node(RowData row, long sqn, Long prevSqn, Long nextSqn, Long nextSqnForRecord, Long timestamp) { + this.row = row; + this.sqn = sqn; + this.prevSqn = prevSqn; + this.nextSqn = nextSqn; + this.nextSqnForRecord = nextSqnForRecord; + this.timestamp = timestamp; + } + + public boolean isLastForRecord() { + return nextSqnForRecord == null; + } + + public boolean isLowestSqn() { + return !hasPrev(); + } + + public boolean isHighestSqn() { + return !hasNext(); + } + + public boolean hasPrev() { + return prevSqn != null; + } + + public boolean hasNext() { + return nextSqn != null; + } + + public Node withNextForRecord(Long nextSeqNoForRecord) { + return new Node(row, sqn, prevSqn, nextSqn, nextSeqNoForRecord, timestamp); + } + + public Node withNext(Long nextSeqNo) { + return new Node(row, sqn, prevSqn, nextSeqNo, nextSqnForRecord, timestamp); + } + + public Node withPrev(Long prevSeqNo) { + return new Node(row, sqn, prevSeqNo, nextSqn, nextSqnForRecord, timestamp); + } + + public Node withRow(RowData row, long timestamp) { + return new Node(row, sqn, prevSqn, nextSqn, nextSqnForRecord, timestamp); + } + + public RowData getRow() { + return row; + } + + public long getSqn() { + return sqn; + } + + public Long getPrevSqn() { + return prevSqn; + } + + public Long getNextSqn() { + return nextSqn; + } + + public Long getNextSqnForRecord() { + return nextSqnForRecord; + } + + public Long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Node)) { + return false; + } + Node node = (Node) o; + // do not compare row data since: + // 1. the type might be different after deserialization, e.g. GenericRowData vs + // BinaryRowData + // 2. proper comparison requires (generated) equalizer + // 3. equals is only used in tests (as opposed to RowDataKey) + return sqn == node.sqn + && Objects.equals(prevSqn, node.prevSqn) + && Objects.equals(nextSqn, node.nextSqn) + && Objects.equals(nextSqnForRecord, node.nextSqnForRecord); + } + + @Override + public int hashCode() { + // rowData is ignored - see equals + return Objects.hash(sqn, prevSqn, nextSqn, nextSqnForRecord); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java new file mode 100644 index 0000000000000..aa6577e4feddc --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializer.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; +import org.apache.flink.table.data.RowData; + +import java.util.ArrayList; +import java.util.List; + +/** {@link TypeSerializer} for {@link Node}. */ +@SuppressWarnings("NullableProblems") +public class NodeSerializer extends CompositeSerializer { + + private static final LongSerializer LONG_SERIALIZER = LongSerializer.INSTANCE; + private static final TypeSerializer NULLABLE_LONG_SERIALIZER = + NullableSerializer.wrap(LONG_SERIALIZER, true); + + public NodeSerializer(TypeSerializer serializer) { + this(null, NodeField.getFieldSerializers(serializer)); + } + + protected NodeSerializer( + PrecomputedParameters precomputed, TypeSerializer[] originalSerializers) { + //noinspection unchecked + super( + PrecomputedParameters.precompute( + true, true, (TypeSerializer[]) originalSerializers), + originalSerializers); + } + + private NodeSerializer(TypeSerializer[] nestedSerializers) { + this(null, nestedSerializers); + } + + @Override + public Node createInstance(Object... values) { + return new Node( + NodeField.ROW.get(values), + NodeField.SEQ_NO.get(values), + NodeField.PREV_SEQ_NO.get(values), + NodeField.NEXT_SEQ_NO.get(values), + NodeField.NEXT_SEQ_NO_FOR_RECORD.get(values), + NodeField.TIMESTAMP.get(values)); + } + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + protected Object getField(Node node, int index) { + return NodeField.get(node, index); + } + + @Override + protected CompositeSerializer createSerializerInstance( + PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { + return new NodeSerializer(precomputed, originalSerializers); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new NodeSerializerSnapshot(this); + } + + @Override + protected void setField(Node value, int index, Object fieldValue) { + throw new UnsupportedOperationException(); + } + + private enum NodeField { + ROW { + @Override + Object get(Node node) { + return node.getRow(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return serializer; + } + }, + SEQ_NO { + @Override + Object get(Node node) { + return node.getSqn(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return LONG_SERIALIZER; + } + }, + PREV_SEQ_NO { + @Override + Object get(Node node) { + return node.getPrevSqn(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return NULLABLE_LONG_SERIALIZER; + } + }, + NEXT_SEQ_NO { + @Override + Object get(Node node) { + return node.getNextSqn(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return NULLABLE_LONG_SERIALIZER; + } + }, + NEXT_SEQ_NO_FOR_RECORD { + @Override + Object get(Node node) { + return node.getNextSqnForRecord(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return NULLABLE_LONG_SERIALIZER; + } + }, + TIMESTAMP { + @Override + Object get(Node node) { + return node.getTimestamp(); + } + + @Override + public TypeSerializer getSerializer(TypeSerializer serializer) { + return LONG_SERIALIZER; + } + }; + + private static TypeSerializer[] getFieldSerializers(TypeSerializer serializer) { + List> result = new ArrayList<>(); + for (NodeField field : values()) { + result.add(field.getSerializer(serializer)); + } + return result.toArray(new TypeSerializer[0]); + } + + public abstract TypeSerializer getSerializer(TypeSerializer serializer); + + abstract Object get(Node node); + + T get(Object... values) { + //noinspection unchecked + return (T) values[ordinal()]; + } + + public static Object get(Node node, int field) { + return values()[field].get(node); + } + } + + /** {@link TypeSerializerSnapshot} of {@link NodeSerializerSnapshot}. */ + public static class NodeSerializerSnapshot + extends CompositeTypeSerializerSnapshot { + @SuppressWarnings("unused") + public NodeSerializerSnapshot() {} + + NodeSerializerSnapshot(NodeSerializer nodeSerializer) { + super(nodeSerializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return 0; + } + + @Override + protected TypeSerializer[] getNestedSerializers(NodeSerializer outerSerializer) { + return outerSerializer.fieldSerializers; + } + + @Override + protected NodeSerializer createOuterSerializerWithNestedSerializers( + TypeSerializer[] nestedSerializers) { + return new NodeSerializer(nestedSerializers); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java new file mode 100644 index 0000000000000..9e449a5a42cf3 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKey.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.types.RowKind; + +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class wraps keys of type {@link RowData} for the following purposes: + * + *
    + *
  1. Fix the {@link RowKind} to be the same in all keys. + *
  2. Project the fields in case of upsert key. + *
  3. Fix {@link Object#equals(Object)} and hashCode for heap state backend. + *
  4. Potentially fix mutability for heap state backend (by copying using serializer) + *
+ */ +@Internal +class RowDataKey { + private final RecordEqualiser equaliser; + private final HashFunction hashFunction; + final RowData rowData; + + RowDataKey(RecordEqualiser equaliser, HashFunction hashFunction) { + this.equaliser = checkNotNull(equaliser); + this.hashFunction = checkNotNull(hashFunction); + this.rowData = null; + } + + public RowDataKey(RowData rowData, RecordEqualiser equaliser, HashFunction hashFunction) { + this.equaliser = checkNotNull(equaliser); + this.hashFunction = checkNotNull(hashFunction); + this.rowData = checkNotNull(rowData); + } + + public static RowDataKey toKeyNotProjected( + RowData row, RecordEqualiser equaliser, HashFunction hasher) { + return toKey(row, equaliser, hasher); + } + + public static RowDataKey toKey(RowData row, RecordEqualiser equaliser, HashFunction hasher) { + row.setRowKind(INSERT); + return new RowDataKey(row, equaliser, hasher); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof RowDataKey)) { + return false; + } + RowDataKey other = (RowDataKey) o; + return equaliser.equals(rowData, other.rowData); + } + + @Override + public int hashCode() { + return hashFunction.hashCode(rowData); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java new file mode 100644 index 0000000000000..9db229160cc05 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializer.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedHashFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; + +import java.io.IOException; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link TypeSerializer} for {@link RowDataKey}. */ +@Internal +public class RowDataKeySerializer extends TypeSerializer { + final TypeSerializer serializer; + final GeneratedRecordEqualiser equaliser; // used to snapshot + final GeneratedHashFunction hashFunction; // used to snapshot + final RecordEqualiser equalizerInstance; // passed to restored keys + final HashFunction hashFunctionInstance; // passed to restored keys + + public RowDataKeySerializer( + TypeSerializer serializer, + RecordEqualiser equalizerInstance, + HashFunction hashFunctionInstance, + GeneratedRecordEqualiser equaliser, + GeneratedHashFunction hashFunction) { + this.serializer = checkNotNull(serializer); + this.equalizerInstance = checkNotNull(equalizerInstance); + this.hashFunctionInstance = checkNotNull(hashFunctionInstance); + this.equaliser = checkNotNull(equaliser); + this.hashFunction = checkNotNull(hashFunction); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer duplicate() { + return new RowDataKeySerializer( + serializer.duplicate(), + equalizerInstance, + hashFunctionInstance, + equaliser, + hashFunction); + } + + @Override + public RowDataKey createInstance() { + return new RowDataKey(equalizerInstance, hashFunctionInstance); + } + + @Override + public RowDataKey copy(RowDataKey from) { + return RowDataKey.toKeyNotProjected( + serializer.copy(from.rowData), equalizerInstance, hashFunctionInstance); + } + + @Override + public RowDataKey copy(RowDataKey from, RowDataKey reuse) { + return copy(from); + } + + @Override + public int getLength() { + return serializer.getLength(); + } + + @Override + public void serialize(RowDataKey record, DataOutputView target) throws IOException { + serializer.serialize(record.rowData, target); + } + + @Override + public RowDataKey deserialize(DataInputView source) throws IOException { + return RowDataKey.toKeyNotProjected( + serializer.deserialize(source), equalizerInstance, hashFunctionInstance); + } + + @Override + public RowDataKey deserialize(RowDataKey reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serializer.copy(source, target); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RowDataKeySerializer) { + RowDataKeySerializer other = (RowDataKeySerializer) obj; + return serializer.equals(other.serializer) + && equalizerInstance.equals(other.equalizerInstance) + && hashFunctionInstance.equals(other.hashFunctionInstance); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(serializer, equalizerInstance, hashFunctionInstance); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowDataKeySerializerSnapshot(this); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java new file mode 100644 index 0000000000000..08d9888e18b75 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedHashFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +import static org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link TypeSerializerSnapshot} of {@link RowDataKeySerializer}. */ +public class RowDataKeySerializerSnapshot implements TypeSerializerSnapshot { + + private RowDataKeySerializer serializer; + private TypeSerializerSnapshot restoredRowDataSerializerSnapshot; + + @SuppressWarnings("unused") + public RowDataKeySerializerSnapshot() { + // this constructor is used when restoring from a checkpoint/savepoint. + } + + public RowDataKeySerializerSnapshot(RowDataKeySerializer serializer) { + this.serializer = checkNotNull(serializer); + } + + @Override + public int getCurrentVersion() { + return 0; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + store(serializer.equaliser, out); + store(serializer.hashFunction, out); + writeVersionedSnapshot(out, serializer.serializer.snapshotConfiguration()); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) + throws IOException { + checkArgument(readVersion == 0, "Unexpected version: " + readVersion); + + GeneratedRecordEqualiser equaliser = restore(in, userCodeClassLoader); + GeneratedHashFunction hashFunction = restore(in, userCodeClassLoader); + + restoredRowDataSerializerSnapshot = + TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader); + + serializer = + new RowDataKeySerializer( + restoredRowDataSerializerSnapshot.restoreSerializer(), + equaliser.newInstance(userCodeClassLoader), + hashFunction.newInstance(userCodeClassLoader), + equaliser, + hashFunction); + } + + private static void store(Object object, DataOutputView out) throws IOException { + byte[] bytes = InstantiationUtil.serializeObject(object); + out.writeInt(bytes.length); + out.write(bytes); + } + + private T restore(DataInputView in, ClassLoader classLoader) throws IOException { + int len = in.readInt(); + byte[] bytes = new byte[len]; + in.read(bytes); + try { + return InstantiationUtil.deserializeObject(bytes, classLoader); // here + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + @Override + public TypeSerializer restoreSerializer() { + return serializer; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + if (!(oldSerializerSnapshot instanceof RowDataKeySerializerSnapshot)) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) oldSerializerSnapshot; + + TypeSerializerSchemaCompatibility compatibility = + old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility( + old.serializer.serializer.snapshotConfiguration()); + + return mapToOuterCompatibility( + compatibility, + serializer.equalizerInstance, + serializer.hashFunctionInstance, + serializer.equaliser, + serializer.hashFunction); + } + + private static TypeSerializerSchemaCompatibility mapToOuterCompatibility( + TypeSerializerSchemaCompatibility rowDataCmp, + RecordEqualiser equaliserInstance, + HashFunction hashFunctionInstance, + GeneratedRecordEqualiser equaliser, + GeneratedHashFunction hashFunction) { + if (rowDataCmp.isCompatibleAsIs()) { + return TypeSerializerSchemaCompatibility.compatibleAsIs(); + } else if (rowDataCmp.isCompatibleAfterMigration()) { + return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); + } else if (rowDataCmp.isCompatibleWithReconfiguredSerializer()) { + return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer( + new RowDataKeySerializer( + rowDataCmp.getReconfiguredSerializer(), + equaliserInstance, + hashFunctionInstance, + equaliser, + hashFunction)); + } else if (rowDataCmp.isIncompatible()) { + return TypeSerializerSchemaCompatibility.incompatible(); + } else { + throw new UnsupportedOperationException("Unknown compatibility mode: " + rowDataCmp); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java new file mode 100644 index 0000000000000..b8bcc438b6820 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfo.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +/** Stores first and las SQN for a record. */ +class RowSqnInfo { + public final long firstSqn; + public final long lastSqn; + + public RowSqnInfo(long firstSqn, long lastSqn) { + Preconditions.checkArgument(firstSqn <= lastSqn); + this.firstSqn = firstSqn; + this.lastSqn = lastSqn; + } + + public static RowSqnInfo ofSingle(long sqn) { + return of(sqn, sqn); + } + + public static RowSqnInfo of(long first, long last) { + return new RowSqnInfo(first, last); + } + + @Override + public String toString() { + return "RowSqnInfo{" + "firstSqn=" + firstSqn + ", lastSqn=" + lastSqn + '}'; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof RowSqnInfo)) { + return false; + } + RowSqnInfo that = (RowSqnInfo) o; + return firstSqn == that.firstSqn && lastSqn == that.lastSqn; + } + + @Override + public int hashCode() { + return Objects.hash(firstSqn, lastSqn); + } + + public RowSqnInfo withFirstSqn(long firstSqn) { + return RowSqnInfo.of(firstSqn, lastSqn); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java new file mode 100644 index 0000000000000..348071a22cdfd --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; + +@SuppressWarnings("ClassEscapesDefinedScope") +public class RowSqnInfoSerializer extends CompositeSerializer { + + public RowSqnInfoSerializer() { + this(null, LongSerializer.INSTANCE, LongSerializer.INSTANCE); + } + + protected RowSqnInfoSerializer( + PrecomputedParameters precomputed, TypeSerializer... fieldSerializers) { + super( + PrecomputedParameters.precompute( + true, true, (TypeSerializer[]) fieldSerializers), + fieldSerializers); + } + + @Override + public RowSqnInfo createInstance(Object... values) { + return new RowSqnInfo((Long) values[0], (Long) values[1]); + } + + @Override + protected void setField(RowSqnInfo sqnInfo, int index, Object fieldValue) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getField(RowSqnInfo value, int index) { + switch (index) { + case 0: + return value.firstSqn; + case 1: + return value.lastSqn; + default: + throw new IllegalArgumentException("invalid index: " + index); + } + } + + @Override + protected CompositeSerializer createSerializerInstance( + PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { + return new RowSqnInfoSerializer(precomputed, originalSerializers); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowSqnInfoSerializerSnapshot(this); + } + + public static class RowSqnInfoSerializerSnapshot + extends CompositeTypeSerializerSnapshot { + + @SuppressWarnings("unused") + public RowSqnInfoSerializerSnapshot() {} + + RowSqnInfoSerializerSnapshot(RowSqnInfoSerializer serializer) { + super(serializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return 0; + } + + @Override + protected TypeSerializer[] getNestedSerializers(RowSqnInfoSerializer outerSerializer) { + return new TypeSerializer[] {LongSerializer.INSTANCE, LongSerializer.INSTANCE}; + } + + @Override + protected RowSqnInfoSerializer createOuterSerializerWithNestedSerializers( + TypeSerializer[] nestedSerializers) { + return new RowSqnInfoSerializer(null, nestedSerializers); + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java new file mode 100644 index 0000000000000..f31db7c80b1b2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.SerializerFactory; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.InternalKeyContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedHashFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo; +import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType; +import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.Strategy; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.TriFunctionWithException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.LongStream; + +import static org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_ALL; +import static org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_LAST_ADDED; +import static org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND; +import static org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeType.REMOVAL_OTHER; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Test for various implementations of {@link SequencedMultiSetState}. */ +@SuppressWarnings({"SameParameterValue", "unused"}) +@ExtendWith(ParameterizedTestExtension.class) +public class SequencedMultiSetStateTest { + + @Parameter(0) + private Strategy strategy; + + @Parameter(1) + private long adaptiveLowThresholdOverride; + + @Parameter(2) + private long adaptiveHighThresholdOverride; + + @Parameters(name = "strategy={0}, lowThreshold={1}, highThreshold={2}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {Strategy.VALUE_STATE, -1, -1}, + new Object[] {Strategy.MAP_STATE, -1, -1}, + new Object[] {Strategy.ADAPTIVE, 0, 1}, + new Object[] {Strategy.ADAPTIVE, 1, 2}, + new Object[] {Strategy.ADAPTIVE, 0, 10}, + new Object[] {Strategy.ADAPTIVE, 9, 10}, + }; + } + + // for simplicity, all tests use string type only, with row key being the 1st column + private static final LogicalType VARCHAR = DataTypes.VARCHAR(50).getLogicalType(); + public static final int KEY_POS = 0; + + @TestTemplate + public void testBasicFlow() throws Exception { + runTest( + (state, keyContext) -> { + keyContext.setCurrentKey("sk1"); + assertTrue(state.isEmpty()); + + state.add(row("key", "value"), 1L); + assertFalse(state.isEmpty()); + + keyContext.setCurrentKey("sk2"); + assertTrue(state.isEmpty()); + + keyContext.setCurrentKey("sk1"); + state.clear(); + assertStateContents(state); + }); + } + + @TestTemplate + public void testAppend() throws Exception { + runTest( + state -> { + // should always keep appending + state.append(row("k1", "x"), 777L); + assertStateContents(state, Tuple2.of(row("k1", "x"), 777L)); + + state.append(row("k1", "x"), 778L); + assertStateContents( + state, + Tuple2.of(row("k1", "x"), 777L), + Tuple2.of(row("k1", "x"), 778L)); + + state.append(row("k2", "y"), 779L); + assertStateContents( + state, + Tuple2.of(row("k1", "x"), 777L), + Tuple2.of(row("k1", "x"), 778L), + Tuple2.of(row("k2", "y"), 779L)); + + state.append(row("k1", "x"), 777L); + assertStateContents( + state, + Tuple2.of(row("k1", "x"), 777L), + Tuple2.of(row("k1", "x"), 778L), + Tuple2.of(row("k2", "y"), 779L), + Tuple2.of(row("k1", "x"), 777L)); + }); + } + + @TestTemplate + public void testAdd() throws Exception { + runTest( + state -> { + state.add(row("k1", "x"), 777L); + assertStateContents(state, row("k1", "x"), 777L); + + state.add(row("k2", "x"), 777L); + assertStateContents( + state, + Tuple2.of(row("k1", "x"), 777L), + Tuple2.of(row("k2", "x"), 777L)); + + state.add(row("k2", "y"), 778L); + assertStateContents( + state, + Tuple2.of(row("k1", "x"), 777L), + Tuple2.of(row("k2", "y"), 778L)); + + state.add(row("k1", "y"), 778L); + assertStateContents( + state, + Tuple2.of(row("k1", "y"), 778L), + Tuple2.of(row("k2", "y"), 778L)); + }); + } + + @TestTemplate + public void testRemove() throws Exception { + runTest( + state -> { + removeAndAssert(state, row("key1"), REMOVAL_NOT_FOUND); + + state.add(row("key1", "value"), 777L); + state.add(row("key2", "value"), 777L); + state.add(row("key3", "value"), 777L); + state.add(row("key4", "value"), 777L); + + removeAndAssert(state, row("key999"), REMOVAL_NOT_FOUND); + removeAndAssert(state, row("key4"), REMOVAL_LAST_ADDED, row("key3", "value")); + removeAndAssert(state, row("key3"), REMOVAL_LAST_ADDED, row("key2", "value")); + removeAndAssert(state, row("key1"), REMOVAL_OTHER); + removeAndAssert( + state, + row("key2", "value-to-return"), + REMOVAL_ALL, + // value-to-return should be returned, not the original value + // according to the current logic of Flink operators + row("key2", "value-to-return")); + + // shouldn't fail e.g. due to bad pointers + removeAndAssert(state, row("key1"), REMOVAL_NOT_FOUND); + removeAndAssert(state, row("key2"), REMOVAL_NOT_FOUND); + }); + } + + @TestTemplate + public void testAddAfterRemovingTail() throws Exception { + runTest( + state -> { + state.add(row("key1", "value-1"), 777L); + state.add(row("key2", "value-2"), 777L); + + // key1 is the tail now - remove it and then add + removeAndAssert(state, row("key1"), REMOVAL_OTHER, row("key1", "value-1")); + state.add(row("key1", "value-1"), 777L); + + // key2 is the tail now - remove it and then add + removeAndAssert(state, row("key2"), REMOVAL_OTHER, row("key2", "value-2")); + state.add(row("key2", "value-2"), 777L); + }); + } + + @TestTemplate + public void testRemoveFirstAppended() throws Exception { + runTest( + state -> { + state.append(row("key", "value-1"), 777L); + state.append(row("key", "value-2"), 778L); + state.append(row("key", "value-3"), 779L); + + removeAndAssert(state, row("key"), REMOVAL_OTHER); + assertStateContents( + state, + Tuple2.of(row("key", "value-2"), 778L), + Tuple2.of(row("key", "value-3"), 779L)); + + removeAndAssert(state, row("key"), REMOVAL_OTHER); + assertStateContents(state, Tuple2.of(row("key", "value-3"), 779L)); + + removeAndAssert(state, row("key"), REMOVAL_ALL, row("key")); + assertTrue(state.isEmpty()); + }); + } + + @TestTemplate + public void testRemoveWithInterleavingRowAppended() throws Exception { + runTest( + state -> { + state.append(row("key1", "value"), 777L); // sqn = 1 + state.append(row("key2", "value"), 777L); // sqn = 2 + state.append(row("key2", "value"), 778L); // sqn = 3 + removeAndAssert(state, row("key2"), REMOVAL_OTHER, row("key2", "value")); + removeAndAssert(state, row("key2"), REMOVAL_LAST_ADDED, row("key1", "value")); + removeAndAssert( + state, + row("key1", "value-to-return"), + REMOVAL_ALL, + row("key1", "value-to-return")); + }); + } + + /** Test that loading and clearing the cache doesn't impact correctness. */ + @TestTemplate + public void testCaching() throws Exception { + runTest( + (state, ctx) -> { + ctx.setCurrentKey("sk1"); + state.add(row("key", "value-1"), 777L); + state.clearCache(); + assertFalse(state.isEmpty()); + + ctx.setCurrentKey("sk2"); + state.loadCache(); + assertTrue(state.isEmpty()); + }); + } + + /** Test that loading and clearing the cache doesn't impact correctness. */ + @TestTemplate + public void testKeyExtraction() throws Exception { + final Function keyExtractor = + row -> ProjectedRowData.from(new int[] {1}).replaceRow(row); + + runTest( + (state, ctx) -> { + ctx.setCurrentKey("sk1"); + state.add(row("value-123", "key"), 777L); + assertFalse(state.isEmpty()); + StateChangeInfo ret = state.remove(row("value-456", "key")); + Tuple3.of(ret.getSizeAfter(), ret.getChangeType(), ret.getPayload()); + assertTrue(state.isEmpty()); + }, + keyExtractor, + 0); + } + + /** Test that row kind is not taken into account when matching the rows. */ + @TestTemplate + public void testRowKindNormalization() throws Exception { + runTest( + state -> { + for (RowKind firstKind : RowKind.values()) { + for (RowKind secondKind : RowKind.values()) { + + state.append(rowOfKind(firstKind, "key", "value"), 778L); + state.remove(rowOfKind(secondKind, "key", "value")); + assertTrue(state.isEmpty()); + + state.add(rowOfKind(firstKind, "key", "value"), 777L); + state.remove(rowOfKind(secondKind, "key", "value")); + assertTrue(state.isEmpty()); + + state.add(rowOfKind(firstKind, "key", "value"), 777L); + state.add(rowOfKind(secondKind, "key", "value"), 778L); + assertStateContents(state, Tuple2.of(row("key", "value"), 778L)); + state.clear(); + } + } + }); + } + + @TestTemplate + public void testAdaptivity() throws Exception { + assumeTrue(strategy == Strategy.ADAPTIVE); + final long totalSize = adaptiveHighThresholdOverride * 2; + runTest( + (state, ctx) -> { + AdaptiveSequencedMultiSetState ad = (AdaptiveSequencedMultiSetState) state; + int runningSize = 0; + + ctx.setCurrentKey("k1"); + assertFalse(ad.isIsUsingLargeState(), "should start with value state"); + for (; runningSize < totalSize; runningSize++) { + assertEquals( + runningSize >= adaptiveHighThresholdOverride, + ad.isIsUsingLargeState(), + "should switch after reaching high threshold"); + ad.append(row("key", "value"), runningSize /* timestamp */); + } + + ctx.setCurrentKey("k2"); + assertFalse(ad.isIsUsingLargeState(), "should not mix different context keys"); + + ctx.setCurrentKey("k1"); + assertTrue(ad.isIsUsingLargeState(), "should not mix different context keys"); + + // remove until hitting the threshold - shouldn't trigger switch + for (; runningSize > adaptiveLowThresholdOverride + 1; runningSize--) { + ad.remove(row("key")); + assertTrue( + ad.isIsUsingLargeState(), + "should switch back after reaching low threshold"); + } + // trigger switch + ad.remove(row("key")); + runningSize--; + assertFalse( + ad.isIsUsingLargeState(), + "should switch back after reaching low threshold"); + // verify the order of the migrated elements by looking at their timestamps + //noinspection unchecked + assertStateContents( + state, + LongStream.range(totalSize - runningSize, totalSize) + .mapToObj(ts -> Tuple2.of(row("key", "value"), ts)) + .toArray(Tuple2[]::new)); + for (; runningSize > 0; runningSize--) { + assertFalse( + ad.isIsUsingLargeState(), + "should switch back after reaching low threshold"); + ad.remove(row("key")); + } + assertTrue(ad.isEmpty()); + assertEquals(0, runningSize); + + for (; runningSize < totalSize; runningSize++) { + assertEquals( + runningSize >= adaptiveHighThresholdOverride, + ad.isIsUsingLargeState(), + "should switch after reaching high threshold"); + ad.add(row(Integer.toString(runningSize), "value"), 777L); + } + assertTrue(ad.isIsUsingLargeState()); + + state.clear(); + assertFalse( + ad.isIsUsingLargeState(), "should switch to value state after clear"); + }); + } + + @TestTemplate + public void testAddReturnValues() throws Exception { + testReturnValues(SequencedMultiSetState::add); + } + + @TestTemplate + public void testAppendReturnValues() throws Exception { + testReturnValues(SequencedMultiSetState::append); + } + + private void testReturnValues( + TriFunctionWithException< + SequencedMultiSetState, + RowData, + Long, + StateChangeInfo, + Exception> + updateFn) + throws Exception { + runTest( + state -> { + StateChangeInfo ret; + + ret = updateFn.apply(state, row("key-1", "value"), 777L); + assertEquals(StateChangeType.ADDITION, ret.getChangeType()); + assertEquals(1, ret.getSizeAfter()); + assertTrue(ret.wasEmpty()); + + ret = updateFn.apply(state, row("key-2", "value"), 777L); + assertEquals(StateChangeType.ADDITION, ret.getChangeType()); + assertEquals(2, ret.getSizeAfter()); + assertFalse(ret.wasEmpty()); + + removeAndAssert(state, row("key-1"), REMOVAL_OTHER); + removeAndAssert(state, row("key-2"), REMOVAL_ALL, row("key-2")); + + ret = updateFn.apply(state, row("key-3", "value"), 777L); + assertEquals(StateChangeType.ADDITION, ret.getChangeType()); + assertEquals(1, ret.getSizeAfter()); + assertTrue(ret.wasEmpty()); + }); + } + + private void runTest(ThrowingConsumer, Exception> test) + throws Exception { + runTest( + (state, keyContext) -> { + keyContext.setCurrentKey("key1"); + test.accept(state); + }); + } + + private void runTest( + BiConsumerWithException< + SequencedMultiSetState, InternalKeyContext, Exception> + test) + throws Exception { + runTest(test, Function.identity(), KEY_POS); + } + + private void runTest( + BiConsumerWithException< + SequencedMultiSetState, InternalKeyContext, Exception> + test, + Function keyExtractor, + int keyPos) + throws Exception { + SequencedMultiSetStateContext p = + new SequencedMultiSetStateContext( + new RowDataSerializer(VARCHAR), + new MyGeneratedEqualiser(keyPos), + new MyGeneratedHashFunction(keyPos), + new RowDataSerializer(VARCHAR, VARCHAR), + keyExtractor, + new SequencedMultiSetStateConfig( + strategy, + adaptiveHighThresholdOverride, + adaptiveLowThresholdOverride, + StateTtlConfig.DISABLED, + TimeDomain.EVENT_TIME)); + + MockEnvironment env = new MockEnvironmentBuilder().build(); + + AbstractKeyedStateBackend stateBackend = + getKeyedStateBackend(env, StringSerializer.INSTANCE); + + RuntimeContext ctx = + new StreamingRuntimeContext( + env, + Collections.emptyMap(), + UnregisteredMetricsGroup.createOperatorMetricGroup(), + new OperatorID(), + new TestProcessingTimeService(), + getKeyedStateStore(stateBackend), + ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES); + + test.accept(SequencedMultiSetState.create(p, ctx, "hashmap"), stateBackend); + } + + private static KeyedStateStore getKeyedStateStore(KeyedStateBackend stateBackend) { + return new DefaultKeyedStateStore( + stateBackend, + new SerializerFactory() { + @Override + public TypeSerializer createSerializer(TypeInformation ti) { + return ti.createSerializer(new SerializerConfigImpl()); + } + }); + } + + private static AbstractKeyedStateBackend getKeyedStateBackend( + MockEnvironment env, TypeSerializer keySerializer) throws IOException { + String op = "test-operator"; + JobID jobId = new JobID(); + JobVertexID jobVertexId = new JobVertexID(); + KeyGroupRange emptyKeyGroupRange = KeyGroupRange.of(0, 10); + int numberOfKeyGroups = emptyKeyGroupRange.getNumberOfKeyGroups(); + + return new HashMapStateBackend() + .createKeyedStateBackend( + new KeyedStateBackendParametersImpl<>( + env, + jobId, + op, + keySerializer, + numberOfKeyGroups, + emptyKeyGroupRange, + new KvStateRegistry().createTaskRegistry(jobId, jobVertexId), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry())); + } + + private static class MyGeneratedEqualiser extends GeneratedRecordEqualiser { + + private final int keyPos; + + public MyGeneratedEqualiser(int keyPos) { + super("", "", new Object[0]); + this.keyPos = keyPos; + } + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(keyPos); + } + } + + private static class MyGeneratedHashFunction extends GeneratedHashFunction { + + private final int keyPos; + + public MyGeneratedHashFunction(int keyPos) { + super("", "", new Object[0], new Configuration()); + this.keyPos = keyPos; + } + + @Override + public HashFunction newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(keyPos); + } + } + + private static class TestRecordEqualiser implements RecordEqualiser, HashFunction { + + private final int keyPos; + + private TestRecordEqualiser(int keyPos) { + this.keyPos = keyPos; + } + + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() + && row1.getString(keyPos).equals(row2.getString(keyPos)); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getString(keyPos)); + } + } + + private static void assertStateContents( + SequencedMultiSetState state, RowData rowData, Long timestamp) + throws Exception { + assertStateContents(state, Tuple2.of(rowData, timestamp)); + } + + @SafeVarargs + private static void assertStateContents( + SequencedMultiSetState state, Tuple2... expectedArr) + throws Exception { + List> actual = new ArrayList<>(); + state.iterator().forEachRemaining(actual::add); + assertEquals(expectedArr.length == 0, state.isEmpty()); + assertEquals(expectedArr.length, actual.size()); + Assertions.assertArrayEquals(expectedArr, actual.toArray()); + } + + private static void removeAndAssert( + SequencedMultiSetState state, + RowData key, + StateChangeType expectedResultType, + RowData... expectedReturnedRow) + throws Exception { + StateChangeInfo ret = state.remove(key); + + assertEquals(expectedResultType, ret.getChangeType()); + switch (ret.getChangeType()) { + case REMOVAL_NOT_FOUND: + assertEquals(Optional.empty(), ret.getPayload()); + break; + case REMOVAL_ALL: + assertEquals(0, ret.getSizeAfter()); + assertTrue(state.isEmpty(), "state is expected to be empty"); + assertEquals(Optional.of(expectedReturnedRow[0]), ret.getPayload()); + break; + case REMOVAL_OTHER: + assertFalse(state.isEmpty(), "state is expected to be non-empty"); + assertEquals(Optional.empty(), ret.getPayload()); + break; + case REMOVAL_LAST_ADDED: + assertFalse(state.isEmpty(), "state is expected to be non-empty"); + assertEquals(Optional.of(expectedReturnedRow[0]), ret.getPayload()); + break; + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java new file mode 100644 index 0000000000000..92bf9d29aaf20 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/MetaSqnInfoSerializerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** Test for {@link MetaSqnInfoSerializer}. */ +class MetaSqnInfoSerializerTest extends SerializerTestBase { + + @Override + protected TypeSerializer createSerializer() { + return new MetaSqnInfoSerializer(); + } + + @Override + protected int getLength() { + return 2 * Long.SIZE / 8; + } + + @Override + protected Class getTypeClass() { + return MetaSqnInfo.class; + } + + @Override + protected MetaSqnInfo[] getTestData() { + return new MetaSqnInfo[] {MetaSqnInfo.of(1L, 2L), MetaSqnInfo.of(1L, 1L)}; + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java new file mode 100644 index 0000000000000..1dbdaf3b3551a --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/NodeSerializerTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.StreamRecordUtils; +import org.apache.flink.table.types.logical.IntType; + +/** Test for {@link RowDataKeySerializer}. */ +public class NodeSerializerTest extends SerializerTestBase { + + @Override + protected TypeSerializer createSerializer() { + return new NodeSerializer(new RowDataSerializer(new IntType())); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return Node.class; + } + + @Override + protected Node[] getTestData() { + return new Node[] { + new Node(StreamRecordUtils.row(1), 1L, null, 2L, 2L, 1L), + new Node(StreamRecordUtils.row(2), 2L, 1L, 3L, 3L, 2L), + new Node(StreamRecordUtils.row(3), 3L, 2L, null, null, 3L), + }; + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java new file mode 100644 index 0000000000000..f78faf9f34317 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedHashFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.StreamRecordUtils; +import org.apache.flink.table.types.logical.IntType; + +import java.util.Objects; + +/** Test for {@link RowDataKeySerializer}. */ +public class RowDataKeySerializerTest extends SerializerTestBase { + + private final TestRecordEqualiser equaliser = new TestRecordEqualiser(); + + @Override + protected TypeSerializer createSerializer() { + return new RowDataKeySerializer( + new RowDataSerializer(new IntType()), + equaliser, + equaliser, + EQUALISER, + HASH_FUNCTION); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class getTypeClass() { + return RowDataKey.class; + } + + @Override + protected RowDataKey[] getTestData() { + return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), equaliser, equaliser)}; + } + + static final GeneratedRecordEqualiser EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + static final GeneratedHashFunction HASH_FUNCTION = + new GeneratedHashFunction("", "", new Object[0], new Configuration()) { + @Override + public HashFunction newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static class TestRecordEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() && row1.getInt(0) == row2.getInt(0); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getInt(0)); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestRecordEqualiser; + } + + @Override + public int hashCode() { + return 0; + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java new file mode 100644 index 0000000000000..fd584c3862fd0 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowSqnInfoSerializerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.sequencedmultisetstate.linked; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** Test for {@link RowSqnInfoSerializer}. */ +class RowSqnInfoSerializerTest extends SerializerTestBase { + + @Override + protected TypeSerializer createSerializer() { + return new RowSqnInfoSerializer(); + } + + @Override + protected int getLength() { + return 2 * Long.SIZE / 8; + } + + @Override + protected Class getTypeClass() { + return RowSqnInfo.class; + } + + @Override + protected RowSqnInfo[] getTestData() { + return new RowSqnInfo[] {RowSqnInfo.of(1L, 2L), RowSqnInfo.of(1L, 1L)}; + } +} diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 1988f8e63b52d..e4e9de6e3ddd0 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -193,6 +193,14 @@ under the License. test + + org.apache.flink + flink-table-runtime + ${project.version} + test-jar + test + + org.apache.flink flink-runtime diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java index 691460a4012d9..976cef6349b25 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java @@ -74,6 +74,10 @@ import org.apache.flink.table.dataview.NullAwareMapSerializer; import org.apache.flink.table.dataview.NullSerializer; import org.apache.flink.table.runtime.operators.window.CountWindow; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowDataKeySerializer; +import org.apache.flink.table.runtime.sequencedmultisetstate.linked.RowSqnInfoSerializer; import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.runtime.typeutils.DecimalDataSerializer; @@ -255,6 +259,10 @@ public void testTypeSerializerTestCoverage() { // KeyAndValueSerializer shouldn't be used to serialize data to state and // doesn't need to ensure upgrade compatibility. "org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer", + RowDataKeySerializer.class.getName(), + NodeSerializer.class.getName(), + RowSqnInfoSerializer.class.getName(), + MetaSqnInfoSerializer.class.getName(), SetSerializer.class.getName()); // check if a test exists for each type serializer