Skip to content

Commit 78f6e77

Browse files
authored
[FLINK-38336][state/forst] Avoid data copy during failover for ForSt statebackend (#27042)
1 parent 1b01b52 commit 78f6e77

16 files changed

+460
-162
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -597,18 +597,14 @@ public void dispose() {
597597
IOUtils.closeQuietly(db);
598598

599599
LOG.info(
600-
"Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.",
601-
optionsContainer.getLocalBasePath(),
602-
optionsContainer.getRemoteBasePath());
600+
"Closed ForSt State Backend. Cleaning up ForSt: {}.",
601+
optionsContainer.getPathContainer());
603602

604603
try {
605604
optionsContainer.clearDirectories();
606605
} catch (Exception ex) {
607606
LOG.warn(
608-
"Could not delete ForSt local working directory {}, remote working directory {}.",
609-
optionsContainer.getLocalBasePath(),
610-
optionsContainer.getRemoteBasePath(),
611-
ex);
607+
"Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex);
612608
}
613609

614610
IOUtils.closeQuietly(optionsContainer);
@@ -630,12 +626,12 @@ public boolean isSafeToReuseKVState() {
630626

631627
@VisibleForTesting
632628
Path getLocalBasePath() {
633-
return optionsContainer.getLocalBasePath();
629+
return optionsContainer.getPathContainer().getLocalBasePath();
634630
}
635631

636632
@VisibleForTesting
637633
Path getRemoteBasePath() {
638-
return optionsContainer.getRemoteBasePath();
634+
return optionsContainer.getPathContainer().getRemoteBasePath();
639635
}
640636

641637
@Override

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
302302
// deletion in file mapping manager.
303303
optionsContainer.forceClearRemoteDirectories();
304304
} catch (Exception ex) {
305-
logger.warn(
306-
"Failed to delete ForSt local base path {}, remote base path {}.",
307-
optionsContainer.getLocalBasePath(),
308-
optionsContainer.getRemoteBasePath(),
309-
ex);
305+
logger.warn("Failed to delete ForSt: {}.", optionsContainer.getPathContainer(), ex);
310306
}
311307
IOUtils.closeQuietly(optionsContainer);
312308
IOUtils.closeQuietly(snapshotStrategy);
@@ -322,9 +318,8 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
322318
InternalKeyContext<K> keyContext =
323319
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
324320
logger.info(
325-
"Finished building ForSt keyed state-backend at local base path: {}, remote base path: {}.",
326-
optionsContainer.getLocalBasePath(),
327-
optionsContainer.getRemoteBasePath());
321+
"Finished building ForSt keyed state-backend at {}",
322+
optionsContainer.getPathContainer());
328323
return new ForStKeyedStateBackend<>(
329324
backendUID,
330325
executionConfig,
@@ -360,8 +355,8 @@ private ForStRestoreOperation getForStRestoreOperation(
360355
// working dir. We will implement this in ForStDB later, but before that, we achieved this
361356
// by setting the dbPath to "/" when the dfs directory existed.
362357
Path instanceForStPath =
363-
optionsContainer.getRemoteForStPath() == null
364-
? optionsContainer.getLocalForStPath()
358+
optionsContainer.getPathContainer().getRemoteForStPath() == null
359+
? optionsContainer.getPathContainer().getLocalForStPath()
365360
: new Path("/db");
366361

367362
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst;
20+
21+
import org.apache.flink.core.fs.Path;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import javax.annotation.Nullable;
27+
28+
import java.util.Objects;
29+
30+
/** Container for ForSt paths. */
31+
public class ForStPathContainer {
32+
33+
private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class);
34+
public static final String DB_DIR_STRING = "db";
35+
36+
/**
37+
* Local job path. This indicates the parent directory of ForSt, which ends with the Flink
38+
* JobID.
39+
*/
40+
@Nullable private final Path localJobPath;
41+
42+
/**
43+
* Local base path. This includes the information of the subtask that holds ForSt, such as the
44+
* Operator Identifier and subtask index.
45+
*/
46+
@Nullable private final Path localBasePath;
47+
48+
/** Local ForSt path. This is the directory of ForSt DB, which ends with 'db'. */
49+
@Nullable private final Path localForStPath;
50+
51+
/**
52+
* Remote paths of ForSt. Similar to the respective Path mentioned above, but located under the
53+
* remote parent path.
54+
*/
55+
@Nullable private final Path remoteJobPath;
56+
57+
@Nullable private final Path remoteBasePath;
58+
@Nullable private final Path remoteForStPath;
59+
60+
public static ForStPathContainer empty() {
61+
return of(null, null, null, null);
62+
}
63+
64+
public static ForStPathContainer ofLocal(
65+
@Nullable Path localJobPath, @Nullable Path localBasePath) {
66+
return new ForStPathContainer(localJobPath, localBasePath, null, null);
67+
}
68+
69+
public static ForStPathContainer of(
70+
@Nullable Path localJobPath,
71+
@Nullable Path localBasePath,
72+
@Nullable Path remoteJobPath,
73+
@Nullable Path remoteBasePath) {
74+
return new ForStPathContainer(localJobPath, localBasePath, remoteJobPath, remoteBasePath);
75+
}
76+
77+
public ForStPathContainer(
78+
@Nullable Path localJobPath,
79+
@Nullable Path localBasePath,
80+
@Nullable Path remoteJobPath,
81+
@Nullable Path remoteBasePath) {
82+
this.localJobPath = localJobPath;
83+
this.localBasePath = localBasePath;
84+
this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null;
85+
86+
this.remoteJobPath = remoteJobPath;
87+
this.remoteBasePath = remoteBasePath;
88+
this.remoteForStPath =
89+
remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null;
90+
91+
LOG.info(
92+
"ForStPathContainer: localJobPath: {}, localBasePath: {}, localForStPath:{}, remoteJobPath: {}, remoteBasePath: {}, remoteForStPath: {}",
93+
localJobPath,
94+
localBasePath,
95+
localForStPath,
96+
remoteJobPath,
97+
remoteBasePath,
98+
remoteForStPath);
99+
}
100+
101+
public @Nullable Path getLocalJobPath() {
102+
return localJobPath;
103+
}
104+
105+
public @Nullable Path getLocalBasePath() {
106+
return localBasePath;
107+
}
108+
109+
public @Nullable Path getLocalForStPath() {
110+
return localForStPath;
111+
}
112+
113+
public @Nullable Path getRemoteJobPath() {
114+
return remoteJobPath;
115+
}
116+
117+
public @Nullable Path getRemoteBasePath() {
118+
return remoteBasePath;
119+
}
120+
121+
public @Nullable Path getRemoteForStPath() {
122+
return remoteForStPath;
123+
}
124+
125+
public Path getJobPath() {
126+
if (remoteJobPath != null) {
127+
return remoteJobPath;
128+
} else {
129+
return localJobPath;
130+
}
131+
}
132+
133+
public Path getBasePath() {
134+
if (remoteBasePath != null) {
135+
return remoteBasePath;
136+
} else {
137+
return localBasePath;
138+
}
139+
}
140+
141+
public Path getDbPath() {
142+
if (remoteForStPath != null) {
143+
return remoteForStPath;
144+
} else {
145+
return localForStPath;
146+
}
147+
}
148+
149+
@Override
150+
public String toString() {
151+
return "ForStPathContainer(localJobPath = ["
152+
+ localJobPath
153+
+ "] localBasePath = ["
154+
+ localBasePath
155+
+ "] localForStPath = ["
156+
+ localForStPath
157+
+ "] remoteJobPath = ["
158+
+ remoteJobPath
159+
+ "] remoteBasePath = ["
160+
+ remoteBasePath
161+
+ "] remoteForStPath = ["
162+
+ remoteForStPath
163+
+ "])";
164+
}
165+
166+
@Override
167+
public boolean equals(Object o) {
168+
if (this == o) {
169+
return true;
170+
}
171+
if (o == null || getClass() != o.getClass()) {
172+
return false;
173+
}
174+
ForStPathContainer that = (ForStPathContainer) o;
175+
return Objects.equals(localJobPath, that.localJobPath)
176+
&& Objects.equals(localBasePath, that.localBasePath)
177+
&& Objects.equals(localForStPath, that.localForStPath)
178+
&& Objects.equals(remoteJobPath, that.remoteJobPath)
179+
&& Objects.equals(remoteBasePath, that.remoteBasePath)
180+
&& Objects.equals(remoteForStPath, that.remoteForStPath);
181+
}
182+
183+
@Override
184+
public int hashCode() {
185+
return Objects.hash(
186+
localJobPath,
187+
localBasePath,
188+
localForStPath,
189+
remoteJobPath,
190+
remoteBasePath,
191+
remoteForStPath);
192+
}
193+
}

0 commit comments

Comments
 (0)