Skip to content

Commit c43d57d

Browse files
authored
[Feature][Connectors-v2] Clean up temporary files for paimon sink (#9819)
1 parent 2653f67 commit c43d57d

File tree

3 files changed

+108
-10
lines changed

3 files changed

+108
-10
lines changed

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.paimon.CoreOptions;
5050
import org.apache.paimon.data.InternalRow;
5151
import org.apache.paimon.disk.IOManager;
52+
import org.apache.paimon.disk.IOManagerImpl;
5253
import org.apache.paimon.schema.TableSchema;
5354
import org.apache.paimon.table.BucketMode;
5455
import org.apache.paimon.table.FileStoreTable;
@@ -83,6 +84,8 @@ public class PaimonSinkWriter
8384

8485
private FileStoreTable paimonTable;
8586

87+
private final IOManagerImpl ioManager;
88+
8689
private TableWrite tableWrite;
8790

8891
private final List<CommitMessage> committables = new ArrayList<>();
@@ -150,6 +153,9 @@ public PaimonSinkWriter(
150153
this.taskIndex = context.getIndexOfSubtask();
151154
this.paimonSinkConfig = paimonSinkConfig;
152155
this.sinkPaimonTableSchema = this.paimonTable.schema();
156+
this.ioManager =
157+
(IOManagerImpl)
158+
IOManager.create(splitPaths(paimonSinkConfig.getChangelogTmpPath()));
153159
this.newTableWrite();
154160
BucketMode bucketMode = this.paimonTable.bucketMode();
155161
// https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket
@@ -275,12 +281,7 @@ private void reOpenTableWrite() {
275281
private void newTableWrite() {
276282
TableWrite oldTableWrite = this.tableWrite;
277283
tableWriteClose(oldTableWrite);
278-
this.tableWrite =
279-
this.paimonTable
280-
.newWrite(commitUser)
281-
.withIOManager(
282-
IOManager.create(
283-
splitPaths(paimonSinkConfig.getChangelogTmpPath())));
284+
this.tableWrite = this.paimonTable.newWrite(commitUser).withIOManager(ioManager);
284285
}
285286

286287
@Override
@@ -329,6 +330,11 @@ public void close() throws IOException {
329330
if (Objects.nonNull(paimonCatalog)) {
330331
paimonCatalog.close();
331332
}
333+
try {
334+
ioManager.close();
335+
} catch (Exception e) {
336+
log.warn("Failed to close io manager in paimon sink writer.", e);
337+
}
332338
}
333339
}
334340

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.e2e.connector.paimon;
1919

20+
import org.apache.seatunnel.common.utils.FileUtils;
2021
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions;
2122
import org.apache.seatunnel.e2e.common.TestResource;
2223
import org.apache.seatunnel.e2e.common.TestSuiteBase;
@@ -43,6 +44,7 @@
4344
import org.testcontainers.containers.Container;
4445
import org.testcontainers.utility.MountableFile;
4546

47+
import java.io.File;
4648
import java.io.IOException;
4749
import java.nio.file.Path;
4850
import java.util.ArrayList;
@@ -53,10 +55,10 @@
5355
disabledReason =
5456
"Paimon does not support flink 1.13, Spark 2.4.6 has a jar package(zstd-jni-version.jar) version compatibility issue.")
5557
public class PaimonIT extends TestSuiteBase implements TestResource {
56-
private String rootUser = "root";
57-
private String rootPassword = "123456";
58-
private String paimonUser = "paimon";
59-
private String paimonUserPassword = "123456";
58+
private final String rootUser = "root";
59+
private final String rootPassword = "123456";
60+
private final String paimonUser = "paimon";
61+
private final String paimonUserPassword = "123456";
6062

6163
private PrivilegedCatalog privilegedCatalog;
6264
private final String DATABASE_NAME = "default";
@@ -184,4 +186,22 @@ public void privilegeEnabledPaimonSourceUnAuthorized(TestContainer container) th
184186
container.executeJob("/paimon_to_paimon_privilege1.conf");
185187
Assertions.assertEquals(1, execResult1.getExitCode());
186188
}
189+
190+
@TestTemplate
191+
public void jobFinishedCleanTmpFiles(TestContainer container) throws Exception {
192+
// fake to paimon
193+
Container.ExecResult execResult =
194+
container.executeJob("/fake_to_paimon_with_change_log_tmp.conf");
195+
Assertions.assertEquals(0, execResult.getExitCode());
196+
// check job finished clean up tmp files
197+
String hostName = System.getProperty("user.name");
198+
boolean isWindows =
199+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
200+
String tmpDir =
201+
isWindows
202+
? String.format("C:/Users/%s/tmp/seatunnel_mnt/paimon_tmp", hostName)
203+
: "/tmp/seatunnel_mnt/paimon_tmp";
204+
List<File> files = FileUtils.listFile(tmpDir);
205+
Assertions.assertTrue(CollectionUtils.isEmpty(files));
206+
}
187207
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
22+
# You can set spark configuration here
23+
spark.app.name = "SeaTunnel"
24+
spark.executor.instances = 2
25+
spark.executor.cores = 1
26+
spark.executor.memory = "1g"
27+
spark.master = local
28+
}
29+
30+
source {
31+
FakeSource {
32+
auto.increment.enabled = true
33+
auto.increment.start = 1
34+
row.num = 100000
35+
schema = {
36+
fields {
37+
pk_id = bigint
38+
c_map = "map<string, string>"
39+
c_array = "array<int>"
40+
c_string = string
41+
c_boolean = boolean
42+
c_tinyint = tinyint
43+
c_smallint = smallint
44+
c_int = int
45+
c_bigint = bigint
46+
c_float = float
47+
c_double = double
48+
c_decimal = "decimal(30, 8)"
49+
c_bytes = bytes
50+
c_date = date
51+
c_timestamp = timestamp
52+
c_time = time
53+
}
54+
primaryKey {
55+
name = "pk_id"
56+
columnNames = [pk_id]
57+
}
58+
}
59+
plugin_output = "fake"
60+
}
61+
}
62+
63+
sink {
64+
Paimon {
65+
warehouse = "/tmp/seatunnel_mnt/paimon"
66+
database = "default"
67+
table = "st_test"
68+
paimon.table.write-props = {
69+
changelog-tmp-path = "/tmp/seatunnel_mnt/paimon_tmp"
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)