Skip to content

Commit 9c4c31e

Browse files
committed
[FLINK-38336][state/forst] Avoid data copy during failover for ForSt statebackend
1 parent 5db1ffa commit 9c4c31e

16 files changed

+409
-161
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -596,18 +596,14 @@ public void dispose() {
596596
IOUtils.closeQuietly(db);
597597

598598
LOG.info(
599-
"Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.",
600-
optionsContainer.getLocalBasePath(),
601-
optionsContainer.getRemoteBasePath());
599+
"Closed ForSt State Backend. Cleaning up ForSt: {}.",
600+
optionsContainer.getPathContainer());
602601

603602
try {
604603
optionsContainer.clearDirectories();
605604
} catch (Exception ex) {
606605
LOG.warn(
607-
"Could not delete ForSt local working directory {}, remote working directory {}.",
608-
optionsContainer.getLocalBasePath(),
609-
optionsContainer.getRemoteBasePath(),
610-
ex);
606+
"Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex);
611607
}
612608

613609
IOUtils.closeQuietly(optionsContainer);
@@ -624,12 +620,12 @@ public boolean isSafeToReuseKVState() {
624620

625621
@VisibleForTesting
626622
Path getLocalBasePath() {
627-
return optionsContainer.getLocalBasePath();
623+
return optionsContainer.getPathContainer().getLocalBasePath();
628624
}
629625

630626
@VisibleForTesting
631627
Path getRemoteBasePath() {
632-
return optionsContainer.getRemoteBasePath();
628+
return optionsContainer.getPathContainer().getRemoteBasePath();
633629
}
634630

635631
@Override

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
302302
// deletion in file mapping manager.
303303
optionsContainer.forceClearRemoteDirectories();
304304
} catch (Exception ex) {
305-
logger.warn(
306-
"Failed to delete ForSt local base path {}, remote base path {}.",
307-
optionsContainer.getLocalBasePath(),
308-
optionsContainer.getRemoteBasePath(),
309-
ex);
305+
logger.warn("Failed to delete ForSt: {}.", optionsContainer.getPathContainer(), ex);
310306
}
311307
IOUtils.closeQuietly(optionsContainer);
312308
IOUtils.closeQuietly(snapshotStrategy);
@@ -322,9 +318,8 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
322318
InternalKeyContext<K> keyContext =
323319
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
324320
logger.info(
325-
"Finished building ForSt keyed state-backend at local base path: {}, remote base path: {}.",
326-
optionsContainer.getLocalBasePath(),
327-
optionsContainer.getRemoteBasePath());
321+
"Finished building ForSt keyed state-backend at {}",
322+
optionsContainer.getPathContainer());
328323
return new ForStKeyedStateBackend<>(
329324
backendUID,
330325
executionConfig,
@@ -360,8 +355,8 @@ private ForStRestoreOperation getForStRestoreOperation(
360355
// working dir. We will implement this in ForStDB later, but before that, we achieved this
361356
// by setting the dbPath to "/" when the dfs directory existed.
362357
Path instanceForStPath =
363-
optionsContainer.getRemoteForStPath() == null
364-
? optionsContainer.getLocalForStPath()
358+
optionsContainer.getPathContainer().getRemoteForStPath() == null
359+
? optionsContainer.getPathContainer().getLocalForStPath()
365360
: new Path("/db");
366361

367362
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst;
20+
21+
import org.apache.flink.core.fs.Path;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import javax.annotation.Nullable;
27+
28+
/** Container for ForSt paths. */
29+
public class ForStPathContainer {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class);
32+
public static final String DB_DIR_STRING = "db";
33+
34+
@Nullable final Path localJobPath;
35+
@Nullable private final Path localBasePath;
36+
@Nullable private final Path localForStPath;
37+
38+
@Nullable private final Path remoteJobPath;
39+
@Nullable private final Path remoteBasePath;
40+
@Nullable private final Path remoteForStPath;
41+
42+
public static ForStPathContainer empty() {
43+
return of(null, null, null, null);
44+
}
45+
46+
public static ForStPathContainer ofLocal(
47+
@Nullable Path localJobPath, @Nullable Path localBasePath) {
48+
return new ForStPathContainer(localJobPath, localBasePath, null, null);
49+
}
50+
51+
public static ForStPathContainer of(
52+
@Nullable Path localJobPath,
53+
@Nullable Path localBasePath,
54+
@Nullable Path remoteJobPath,
55+
@Nullable Path remoteBasePath) {
56+
return new ForStPathContainer(localJobPath, localBasePath, remoteJobPath, remoteBasePath);
57+
}
58+
59+
public ForStPathContainer(
60+
@Nullable Path localJobPath,
61+
@Nullable Path localBasePath,
62+
@Nullable Path remoteJobPath,
63+
@Nullable Path remoteBasePath) {
64+
this.localJobPath = localJobPath;
65+
this.localBasePath = localBasePath;
66+
this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null;
67+
68+
this.remoteJobPath = remoteJobPath;
69+
this.remoteBasePath = remoteBasePath;
70+
this.remoteForStPath =
71+
remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null;
72+
73+
LOG.info(
74+
"ForStPathContainer: localJobPath: {}, localBasePath: {}, localForStPath:{}, remoteJobPath: {}, remoteBasePath: {}, remoteForStPath: {}",
75+
localJobPath,
76+
localBasePath,
77+
localForStPath,
78+
remoteJobPath,
79+
remoteBasePath,
80+
remoteForStPath);
81+
}
82+
83+
public @Nullable Path getLocalJobPath() {
84+
return localJobPath;
85+
}
86+
87+
public @Nullable Path getLocalBasePath() {
88+
return localBasePath;
89+
}
90+
91+
public @Nullable Path getLocalForStPath() {
92+
return localForStPath;
93+
}
94+
95+
public @Nullable Path getRemoteJobPath() {
96+
return remoteJobPath;
97+
}
98+
99+
public @Nullable Path getRemoteBasePath() {
100+
return remoteBasePath;
101+
}
102+
103+
public @Nullable Path getRemoteForStPath() {
104+
return remoteForStPath;
105+
}
106+
107+
public Path getJobPath() {
108+
if (remoteJobPath != null) {
109+
return remoteJobPath;
110+
} else {
111+
return localJobPath;
112+
}
113+
}
114+
115+
public Path getBasePath() {
116+
if (remoteBasePath != null) {
117+
return remoteBasePath;
118+
} else {
119+
return localBasePath;
120+
}
121+
}
122+
123+
public Path getDbPath() {
124+
if (remoteForStPath != null) {
125+
return remoteForStPath;
126+
} else {
127+
return localForStPath;
128+
}
129+
}
130+
131+
@Override
132+
public String toString() {
133+
return "ForStPathContainer(localJobPath = ["
134+
+ localJobPath
135+
+ "] localBasePath = ["
136+
+ localBasePath
137+
+ "] localForStPath = ["
138+
+ localForStPath
139+
+ "] remoteJobPath = ["
140+
+ remoteJobPath
141+
+ "] remoteBasePath = ["
142+
+ remoteBasePath
143+
+ "] remoteForStPath = ["
144+
+ remoteForStPath
145+
+ "])";
146+
}
147+
}

0 commit comments

Comments
 (0)