Skip to content

Commit b7e13d0

Browse files
committed
[FLINK-38558] Require copy when use heap timer
1 parent ca20e1d commit b7e13d0

File tree

4 files changed

+32
-11
lines changed

4 files changed

+32
-11
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy
938938

939939
@Override
940940
public boolean isSafeToReuseKVState() {
941-
return true;
941+
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
942942
}
943943

944944
@VisibleForTesting

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
5353
import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
54+
import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
5455
import static org.assertj.core.api.Assertions.assertThat;
5556

5657
/** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
@@ -64,7 +65,8 @@ public static List<Object[]> modes() {
6465
new Object[][] {
6566
{
6667
(SupplierWithException<CheckpointStorage, IOException>)
67-
JobManagerCheckpointStorage::new
68+
JobManagerCheckpointStorage::new,
69+
false
6870
},
6971
{
7072
(SupplierWithException<CheckpointStorage, IOException>)
@@ -73,12 +75,17 @@ public static List<Object[]> modes() {
7375
TempDirUtils.newFolder(tempFolder).toURI().toString();
7476
return new FileSystemCheckpointStorage(
7577
new Path(checkpointPath), 0, -1);
76-
}
78+
},
79+
true
7780
}
7881
});
7982
}
8083

81-
@Parameter public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
84+
@Parameter(value = 0)
85+
public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
86+
87+
@Parameter(value = 1)
88+
public boolean useHeapTimer;
8289

8390
@Override
8491
protected CheckpointStorage getCheckpointStorage() throws Exception {
@@ -91,6 +98,11 @@ protected ConfigurableStateBackend getStateBackend() throws Exception {
9198
Configuration config = new Configuration();
9299
config.set(LOCAL_DIRECTORIES, tempFolder.toString());
93100
config.set(USE_INGEST_DB_RESTORE_MODE, true);
101+
config.set(
102+
TIMER_SERVICE_FACTORY,
103+
useHeapTimer
104+
? ForStStateBackend.PriorityQueueStateType.HEAP
105+
: ForStStateBackend.PriorityQueueStateType.ForStDB);
94106
return backend.configure(config, Thread.currentThread().getContextClassLoader());
95107
}
96108

@@ -109,7 +121,7 @@ protected boolean supportsAsynchronousSnapshots() {
109121
*/
110122
@Override
111123
protected boolean isSafeToReuseKVState() {
112-
return true;
124+
return !useHeapTimer;
113125
}
114126

115127
@TestTemplate

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointTy
11081108

11091109
@Override
11101110
public boolean isSafeToReuseKVState() {
1111-
return true;
1111+
return !(priorityQueueFactory instanceof HeapPriorityQueueSetFactory);
11121112
}
11131113

11141114
@Override

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
100100
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
101101
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
102+
import static org.apache.flink.state.rocksdb.RocksDBOptions.TIMER_SERVICE_FACTORY;
102103
import static org.assertj.core.api.Assertions.assertThat;
103104
import static org.assertj.core.api.Assertions.assertThatThrownBy;
104105
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,13 +132,15 @@ public static List<Object[]> modes() {
131132
true,
132133
(SupplierWithException<CheckpointStorage, IOException>)
133134
JobManagerCheckpointStorage::new,
135+
false,
134136
false
135137
},
136138
{
137139
true,
138140
(SupplierWithException<CheckpointStorage, IOException>)
139141
JobManagerCheckpointStorage::new,
140-
true
142+
true,
143+
false
141144
},
142145
{
143146
false,
@@ -148,7 +151,8 @@ public static List<Object[]> modes() {
148151
return new FileSystemCheckpointStorage(
149152
new Path(checkpointPath), 0, -1);
150153
},
151-
false
154+
false,
155+
true
152156
}
153157
});
154158
}
@@ -162,6 +166,9 @@ public static List<Object[]> modes() {
162166
@Parameter(value = 2)
163167
public boolean useIngestDB;
164168

169+
@Parameter(value = 3)
170+
public boolean useHeapTimer;
171+
165172
// Store it because we need it for the cleanup test.
166173
private String dbPath;
167174
private RocksDB db = null;
@@ -203,8 +210,10 @@ private Configuration createBackendConfig() {
203210
Configuration configuration = new Configuration();
204211
configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
205212
configuration.set(
206-
RocksDBOptions.TIMER_SERVICE_FACTORY,
207-
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
213+
TIMER_SERVICE_FACTORY,
214+
useHeapTimer
215+
? EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP
216+
: EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
208217
configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1));
209218
return configuration;
210219
}
@@ -234,7 +243,7 @@ protected boolean supportsAsynchronousSnapshots() {
234243

235244
@Override
236245
protected boolean isSafeToReuseKVState() {
237-
return true;
246+
return !useHeapTimer;
238247
}
239248

240249
// small safety net for instance cleanups, so that no native objects are left

0 commit comments

Comments
 (0)