Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ protected boolean supportsMetaInfoVerification() {
return false;
}

@Override
protected boolean isSafeToReuseKVState() {
return true;
}

@TestTemplate
@Disabled("The type of handle returned from snapshot() is not incremental")
public void testSharedIncrementalStateDeRegistration() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy

@Override
public boolean isSafeToReuseKVState() {
return true;
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is set in the constructor - it would be better to do the instanceof there and then set an instance field flag. At the moment we do the instanceof every call of this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you read the code that invokes this method, you may find it only invoked once when the subtask start. So don't worry about the performance. I'd prefer keep this as it's more readable.

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
Expand All @@ -64,7 +65,8 @@ public static List<Object[]> modes() {
new Object[][] {
{
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new
JobManagerCheckpointStorage::new,
false
},
{
(SupplierWithException<CheckpointStorage, IOException>)
Expand All @@ -73,12 +75,17 @@ public static List<Object[]> modes() {
TempDirUtils.newFolder(tempFolder).toURI().toString();
return new FileSystemCheckpointStorage(
new Path(checkpointPath), 0, -1);
}
},
true
}
});
}

@Parameter public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
@Parameter(value = 0)
public SupplierWithException<CheckpointStorage, IOException> storageSupplier;

@Parameter(value = 1)
public boolean useHeapTimer;

@Override
protected CheckpointStorage getCheckpointStorage() throws Exception {
Expand All @@ -91,6 +98,11 @@ protected ConfigurableStateBackend getStateBackend() throws Exception {
Configuration config = new Configuration();
config.set(LOCAL_DIRECTORIES, tempFolder.toString());
config.set(USE_INGEST_DB_RESTORE_MODE, true);
config.set(
TIMER_SERVICE_FACTORY,
useHeapTimer
? ForStStateBackend.PriorityQueueStateType.HEAP
: ForStStateBackend.PriorityQueueStateType.ForStDB);
return backend.configure(config, Thread.currentThread().getContextClassLoader());
}

Expand All @@ -109,7 +121,7 @@ protected boolean supportsAsynchronousSnapshots() {
*/
@Override
protected boolean isSafeToReuseKVState() {
return true;
return !useHeapTimer;
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy

@Override
public boolean isSafeToReuseKVState() {
return true;
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -131,13 +132,15 @@ public static List<Object[]> modes() {
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new,
false,
false
},
{
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new,
true
true,
false
},
{
false,
Expand All @@ -148,7 +151,8 @@ public static List<Object[]> modes() {
return new FileSystemCheckpointStorage(
new Path(checkpointPath), 0, -1);
},
false
false,
true
}
});
}
Expand All @@ -162,6 +166,9 @@ public static List<Object[]> modes() {
@Parameter(value = 2)
public boolean useIngestDB;

@Parameter(value = 3)
public boolean useHeapTimer;

// Store it because we need it for the cleanup test.
private String dbPath;
private RocksDB db = null;
Expand Down Expand Up @@ -203,8 +210,10 @@ private Configuration createBackendConfig() {
Configuration configuration = new Configuration();
configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
configuration.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
TIMER_SERVICE_FACTORY,
useHeapTimer
? EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP
: EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1));
return configuration;
}
Expand Down Expand Up @@ -234,7 +243,7 @@ protected boolean supportsAsynchronousSnapshots() {

@Override
protected boolean isSafeToReuseKVState() {
return true;
return !useHeapTimer;
}

// small safety net for instance cleanups, so that no native objects are left
Expand Down