diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java index 168ca6e10528f..1f94beb33f80d 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java @@ -41,6 +41,8 @@ import java.nio.file.Path; import java.time.Duration; +import static org.junit.jupiter.api.Assumptions.assumeFalse; + /** Tests for {@link ChangelogStateBackend} delegating {@link EmbeddedRocksDBStateBackend}. */ public class ChangelogDelegateEmbeddedRocksDBStateBackendTest extends EmbeddedRocksDBStateBackendTest { @@ -62,11 +64,6 @@ protected boolean supportsMetaInfoVerification() { return false; } - @Override - protected boolean isSafeToReuseKVState() { - return true; - } - @TestTemplate @Disabled("The type of handle returned from snapshot() is not incremental") public void testSharedIncrementalStateDeRegistration() {} @@ -118,6 +115,9 @@ public void testMaterializedRestoreWithWrappedState() throws Exception { @TestTemplate public void testMaterializedRestorePriorityQueue() throws Exception { + assumeFalse( + useHeapTimer, + "Heap priority queue does not support restore test on managed keyed state"); CheckpointStreamFactory streamFactory = createStreamFactory(); ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index f1aa35d0300d0..4f410ca4287ae 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -938,7 +938,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy @Override public boolean isSafeToReuseKVState() { - return true; + return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory); } @VisibleForTesting diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java index b3d8bc8e364d9..08945e75567f7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java @@ -51,6 +51,7 @@ import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES; +import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY; import static org.assertj.core.api.Assertions.assertThat; /** Tests for the partitioned state part of {@link ForStStateBackendTest}. */ @@ -64,7 +65,8 @@ public static List modes() { new Object[][] { { (SupplierWithException) - JobManagerCheckpointStorage::new + JobManagerCheckpointStorage::new, + false }, { (SupplierWithException) @@ -73,12 +75,17 @@ public static List modes() { TempDirUtils.newFolder(tempFolder).toURI().toString(); return new FileSystemCheckpointStorage( new Path(checkpointPath), 0, -1); - } + }, + true } }); } - @Parameter public SupplierWithException storageSupplier; + @Parameter(value = 0) + public SupplierWithException storageSupplier; + + @Parameter(value = 1) + public boolean useHeapTimer; @Override protected CheckpointStorage getCheckpointStorage() throws Exception { @@ -91,6 +98,11 @@ protected ConfigurableStateBackend getStateBackend() throws Exception { Configuration config = new Configuration(); config.set(LOCAL_DIRECTORIES, tempFolder.toString()); config.set(USE_INGEST_DB_RESTORE_MODE, true); + config.set( + TIMER_SERVICE_FACTORY, + useHeapTimer + ? ForStStateBackend.PriorityQueueStateType.HEAP + : ForStStateBackend.PriorityQueueStateType.ForStDB); return backend.configure(config, Thread.currentThread().getContextClassLoader()); } @@ -109,7 +121,7 @@ protected boolean supportsAsynchronousSnapshots() { */ @Override protected boolean isSafeToReuseKVState() { - return true; + return !useHeapTimer; } @TestTemplate diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java index 0cf13b3384b3a..b77232cb067b3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java @@ -1108,7 +1108,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy @Override public boolean isSafeToReuseKVState() { - return true; + return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java index b188f9afae5ce..6e40503c2add0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java @@ -99,6 +99,7 @@ import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; +import static org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -131,13 +132,15 @@ public static List modes() { true, (SupplierWithException) JobManagerCheckpointStorage::new, + false, false }, { true, (SupplierWithException) JobManagerCheckpointStorage::new, - true + true, + false }, { false, @@ -148,8 +151,16 @@ public static List modes() { return new FileSystemCheckpointStorage( new Path(checkpointPath), 0, -1); }, + false, false - } + }, + { + true, + (SupplierWithException) + JobManagerCheckpointStorage::new, + false, + true + }, }); } @@ -162,6 +173,9 @@ public static List modes() { @Parameter(value = 2) public boolean useIngestDB; + @Parameter(value = 3) + public boolean useHeapTimer; + // Store it because we need it for the cleanup test. private String dbPath; private RocksDB db = null; @@ -203,8 +217,10 @@ private Configuration createBackendConfig() { Configuration configuration = new Configuration(); configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB); configuration.set( - RocksDBOptions.TIMER_SERVICE_FACTORY, - EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); + TIMER_SERVICE_FACTORY, + useHeapTimer + ? EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP + : EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1)); return configuration; } @@ -234,7 +250,7 @@ protected boolean supportsAsynchronousSnapshots() { @Override protected boolean isSafeToReuseKVState() { - return true; + return !useHeapTimer; } // small safety net for instance cleanups, so that no native objects are left