Skip to content

Commit 1162ac0

Browse files
committed
[FLINK-38558] Require copy when use heap timer
1 parent 8aaeafc commit 1162ac0

File tree

5 files changed

+44
-16
lines changed

5 files changed

+44
-16
lines changed

flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.nio.file.Path;
4242
import java.time.Duration;
4343

44+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
45+
4446
/** Tests for {@link ChangelogStateBackend} delegating {@link EmbeddedRocksDBStateBackend}. */
4547
public class ChangelogDelegateEmbeddedRocksDBStateBackendTest
4648
extends EmbeddedRocksDBStateBackendTest {
@@ -62,11 +64,6 @@ protected boolean supportsMetaInfoVerification() {
6264
return false;
6365
}
6466

65-
@Override
66-
protected boolean isSafeToReuseKVState() {
67-
return true;
68-
}
69-
7067
@TestTemplate
7168
@Disabled("The type of handle returned from snapshot() is not incremental")
7269
public void testSharedIncrementalStateDeRegistration() {}
@@ -118,6 +115,9 @@ public void testMaterializedRestoreWithWrappedState() throws Exception {
118115

119116
@TestTemplate
120117
public void testMaterializedRestorePriorityQueue() throws Exception {
118+
assumeFalse(
119+
useHeapTimer,
120+
"Heap priority queue does not support restore test on managed keyed state");
121121
CheckpointStreamFactory streamFactory = createStreamFactory();
122122

123123
ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue(

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy
938938

939939
@Override
940940
public boolean isSafeToReuseKVState() {
941-
return true;
941+
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
942942
}
943943

944944
@VisibleForTesting

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
5353
import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
54+
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
5455
import static org.assertj.core.api.Assertions.assertThat;
5556

5657
/** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
@@ -64,7 +65,8 @@ public static List<Object[]> modes() {
6465
new Object[][] {
6566
{
6667
(SupplierWithException<CheckpointStorage, IOException>)
67-
JobManagerCheckpointStorage::new
68+
JobManagerCheckpointStorage::new,
69+
false
6870
},
6971
{
7072
(SupplierWithException<CheckpointStorage, IOException>)
@@ -73,12 +75,17 @@ public static List<Object[]> modes() {
7375
TempDirUtils.newFolder(tempFolder).toURI().toString();
7476
return new FileSystemCheckpointStorage(
7577
new Path(checkpointPath), 0, -1);
76-
}
78+
},
79+
true
7780
}
7881
});
7982
}
8083

81-
@Parameter public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
84+
@Parameter(value = 0)
85+
public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
86+
87+
@Parameter(value = 1)
88+
public boolean useHeapTimer;
8289

8390
@Override
8491
protected CheckpointStorage getCheckpointStorage() throws Exception {
@@ -91,6 +98,11 @@ protected ConfigurableStateBackend getStateBackend() throws Exception {
9198
Configuration config = new Configuration();
9299
config.set(LOCAL_DIRECTORIES, tempFolder.toString());
93100
config.set(USE_INGEST_DB_RESTORE_MODE, true);
101+
config.set(
102+
TIMER_SERVICE_FACTORY,
103+
useHeapTimer
104+
? ForStStateBackend.PriorityQueueStateType.HEAP
105+
: ForStStateBackend.PriorityQueueStateType.ForStDB);
94106
return backend.configure(config, Thread.currentThread().getContextClassLoader());
95107
}
96108

@@ -109,7 +121,7 @@ protected boolean supportsAsynchronousSnapshots() {
109121
*/
110122
@Override
111123
protected boolean isSafeToReuseKVState() {
112-
return true;
124+
return !useHeapTimer;
113125
}
114126

115127
@TestTemplate

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy
11081108

11091109
@Override
11101110
public boolean isSafeToReuseKVState() {
1111-
return true;
1111+
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
11121112
}
11131113

11141114
@Override

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
100100
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
101101
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
102+
import static org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY;
102103
import static org.assertj.core.api.Assertions.assertThat;
103104
import static org.assertj.core.api.Assertions.assertThatThrownBy;
104105
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,13 +132,15 @@ public static List<Object[]> modes() {
131132
true,
132133
(SupplierWithException<CheckpointStorage, IOException>)
133134
JobManagerCheckpointStorage::new,
135+
false,
134136
false
135137
},
136138
{
137139
true,
138140
(SupplierWithException<CheckpointStorage, IOException>)
139141
JobManagerCheckpointStorage::new,
140-
true
142+
true,
143+
false
141144
},
142145
{
143146
false,
@@ -148,8 +151,16 @@ public static List<Object[]> modes() {
148151
return new FileSystemCheckpointStorage(
149152
new Path(checkpointPath), 0, -1);
150153
},
154+
false,
151155
false
152-
}
156+
},
157+
{
158+
true,
159+
(SupplierWithException<CheckpointStorage, IOException>)
160+
JobManagerCheckpointStorage::new,
161+
false,
162+
true
163+
},
153164
});
154165
}
155166

@@ -162,6 +173,9 @@ public static List<Object[]> modes() {
162173
@Parameter(value = 2)
163174
public boolean useIngestDB;
164175

176+
@Parameter(value = 3)
177+
public boolean useHeapTimer;
178+
165179
// Store it because we need it for the cleanup test.
166180
private String dbPath;
167181
private RocksDB db = null;
@@ -203,8 +217,10 @@ private Configuration createBackendConfig() {
203217
Configuration configuration = new Configuration();
204218
configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
205219
configuration.set(
206-
RocksDBOptions.TIMER_SERVICE_FACTORY,
207-
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
220+
TIMER_SERVICE_FACTORY,
221+
useHeapTimer
222+
? EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP
223+
: EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
208224
configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1));
209225
return configuration;
210226
}
@@ -234,7 +250,7 @@ protected boolean supportsAsynchronousSnapshots() {
234250

235251
@Override
236252
protected boolean isSafeToReuseKVState() {
237-
return true;
253+
return !useHeapTimer;
238254
}
239255

240256
// small safety net for instance cleanups, so that no native objects are left

0 commit comments

Comments
 (0)