Skip to content

Commit 6d045aa

Browse files
committed
[Fix][Zeta] Fix pendingJobMasterMap resource leak
1 parent 272b886 commit 6d045aa

File tree

9 files changed

+0
-27
lines changed

9 files changed

+0
-27
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ public void testBatchJobRunOkIn2Node() throws Exception {
112112
engineClient.createExecutionContext(
113113
testResources.getRight(), jobConfig, seaTunnelConfig);
114114
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
115-
TimeUnit.SECONDS.sleep(2);
116115
CompletableFuture<JobStatus> objectCompletableFuture =
117116
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
118117
Awaitility.await()
@@ -232,7 +231,6 @@ public void testStreamJobRunOkIn2Node() throws Exception {
232231
engineClient.createExecutionContext(
233232
testResources.getRight(), jobConfig, seaTunnelConfig);
234233
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
235-
TimeUnit.SECONDS.sleep(2);
236234
CompletableFuture<JobStatus> objectCompletableFuture =
237235
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
238236

@@ -328,7 +326,6 @@ public void testBatchJobRestoreIn2NodeWorkerDown() throws Exception {
328326
engineClient.createExecutionContext(
329327
testResources.getRight(), jobConfig, seaTunnelConfig);
330328
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
331-
TimeUnit.SECONDS.sleep(2);
332329
CompletableFuture<JobStatus> objectCompletableFuture =
333330
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
334331

@@ -424,7 +421,6 @@ public void testStreamJobRestoreIn2NodeWorkerDown() throws Exception {
424421
engineClient.createExecutionContext(
425422
testResources.getRight(), jobConfig, seaTunnelConfig);
426423
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
427-
TimeUnit.SECONDS.sleep(2);
428424
CompletableFuture<JobStatus> waitForCompletableFuture =
429425
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
430426
Awaitility.await()
@@ -540,7 +536,6 @@ public void testBatchJobRestoreIn2NodeMasterDown() throws Exception {
540536
engineClient.createExecutionContext(
541537
testResources.getRight(), jobConfig, seaTunnelConfig);
542538
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
543-
TimeUnit.SECONDS.sleep(2);
544539
CompletableFuture<JobStatus> objectCompletableFuture =
545540
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
546541

@@ -640,7 +635,6 @@ public void testStreamJobRestoreIn2NodeMasterDown() throws Exception {
640635
engineClient.createExecutionContext(
641636
testResources.getRight(), jobConfig, seaTunnelConfig);
642637
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
643-
TimeUnit.SECONDS.sleep(2);
644638
CompletableFuture<JobStatus> objectCompletableFuture =
645639
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
646640

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public void testTwoPipelineBatchJobRunOkIn2Node() throws Exception {
118118
engineClient.createExecutionContext(
119119
testResources.getRight(), jobConfig, seaTunnelConfig);
120120
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
121-
TimeUnit.SECONDS.sleep(2);
122121
CompletableFuture<JobStatus> objectCompletableFuture =
123122
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
124123

@@ -247,7 +246,6 @@ public void testTwoPipelineStreamJobRunOkIn2Node() throws Exception {
247246
engineClient.createExecutionContext(
248247
testResources.getRight(), jobConfig, seaTunnelConfig);
249248
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
250-
TimeUnit.SECONDS.sleep(2);
251249
CompletableFuture<JobStatus> objectCompletableFuture =
252250
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
253251

@@ -345,7 +343,6 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws Exception {
345343
engineClient.createExecutionContext(
346344
testResources.getRight(), jobConfig, seaTunnelConfig);
347345
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
348-
TimeUnit.SECONDS.sleep(2);
349346
CompletableFuture<JobStatus> objectCompletableFuture =
350347
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
351348

@@ -458,7 +455,6 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws Exception
458455
engineClient.createExecutionContext(
459456
testResources.getRight(), jobConfig, seaTunnelConfig);
460457
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
461-
TimeUnit.SECONDS.sleep(2);
462458
CompletableFuture<JobStatus> objectCompletableFuture =
463459
CompletableFuture.supplyAsync(() -> clientJobProxy.waitForJobComplete());
464460

@@ -582,7 +578,6 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws Exception {
582578
engineClient.createExecutionContext(
583579
testResources.getRight(), jobConfig, seaTunnelConfig);
584580
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
585-
TimeUnit.SECONDS.sleep(2);
586581
CompletableFuture<JobStatus> objectCompletableFuture =
587582
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
588583

@@ -692,7 +687,6 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws Exception
692687
engineClient.createExecutionContext(
693688
testResources.getRight(), jobConfig, seaTunnelConfig);
694689
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
695-
TimeUnit.SECONDS.sleep(2);
696690
CompletableFuture<JobStatus> objectCompletableFuture =
697691
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
698692

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ private static void runJobFileWithAssertEndStatus(
9191
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
9292

9393
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
94-
TimeUnit.SECONDS.sleep(2);
9594
CompletableFuture<JobStatus> objectCompletableFuture =
9695
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
9796

@@ -148,7 +147,6 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException {
148147
ClientJobExecutionEnvironment jobExecutionEnv =
149148
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
150149
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
151-
TimeUnit.SECONDS.sleep(2);
152150
CompletableFuture<JobStatus> completableFuture =
153151
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
154152
await().atMost(300000, TimeUnit.MILLISECONDS)
@@ -171,7 +169,6 @@ public void testValidJobNameInJobConfig() throws ExecutionException, Interrupted
171169
ClientJobExecutionEnvironment jobExecutionEnv =
172170
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
173171
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
174-
TimeUnit.SECONDS.sleep(2);
175172
CompletableFuture<JobStatus> completableFuture =
176173
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
177174
await().atMost(300000, TimeUnit.MILLISECONDS)
@@ -218,7 +215,6 @@ public void testExpiredJobWasDeleted() throws Exception {
218215
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
219216

220217
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
221-
TimeUnit.SECONDS.sleep(2);
222218
Assertions.assertEquals(clientJobProxy.waitForJobComplete(), JobStatus.FINISHED);
223219
await().atMost(65, TimeUnit.SECONDS)
224220
.untilAsserted(

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public void testSlotNotEnough() throws Exception {
6767
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
6868

6969
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
70-
TimeUnit.SECONDS.sleep(2);
7170
CompletableFuture<JobStatus> objectCompletableFuture =
7271
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
7372
Awaitility.await()
@@ -120,7 +119,6 @@ public void testSlotEnough() throws Exception {
120119
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
121120

122121
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
123-
TimeUnit.SECONDS.sleep(2);
124122
CompletableFuture<JobStatus> objectCompletableFuture =
125123
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
126124
Awaitility.await()

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SplitClusterFaultToleranceIT.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ public void testBatchJobRunOk() throws Exception {
120120
engineClient.createExecutionContext(
121121
testResources.getRight(), jobConfig, seaTunnelConfig);
122122
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
123-
TimeUnit.SECONDS.sleep(2);
124123
CompletableFuture<JobStatus> objectCompletableFuture =
125124
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
126125
Awaitility.await()
@@ -264,7 +263,6 @@ public void testStreamJobRunOk() throws Exception {
264263
engineClient.createExecutionContext(
265264
testResources.getRight(), jobConfig, seaTunnelConfig);
266265
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
267-
TimeUnit.SECONDS.sleep(2);
268266
CompletableFuture<JobStatus> objectCompletableFuture =
269267
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
270268

@@ -376,7 +374,6 @@ public void testBatchJobRestoreInWorkerDown() throws Exception {
376374
engineClient.createExecutionContext(
377375
testResources.getRight(), jobConfig, seaTunnelConfig);
378376
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
379-
TimeUnit.SECONDS.sleep(2);
380377
CompletableFuture<JobStatus> objectCompletableFuture =
381378
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
382379

@@ -634,7 +631,6 @@ public void testBatchJobRestoreInMasterDown() throws Exception {
634631
engineClient.createExecutionContext(
635632
testResources.getRight(), jobConfig, seaTunnelConfig);
636633
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
637-
TimeUnit.SECONDS.sleep(2);
638634
CompletableFuture<JobStatus> objectCompletableFuture =
639635
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
640636

@@ -757,7 +753,6 @@ public void testStreamJobRestoreInMasterDown() throws Exception {
757753
engineClient.createExecutionContext(
758754
testResources.getRight(), jobConfig, seaTunnelConfig);
759755
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
760-
TimeUnit.SECONDS.sleep(2);
761756
CompletableFuture<JobStatus> objectCompletableFuture =
762757
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
763758

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin
134134
engineClient.createExecutionContext(
135135
testResources.getRight(), jobConfig, seaTunnelConfig);
136136
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
137-
TimeUnit.SECONDS.sleep(2);
138137
CompletableFuture<JobStatus> objectCompletableFuture =
139138
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
140139
Awaitility.await()

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/ConnectorPackageClientTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ public void testExecuteJob() {
252252
ClientJobExecutionEnvironment jobExecutionEnv =
253253
seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
254254
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
255-
TimeUnit.SECONDS.sleep(2);
256255
CompletableFuture<JobStatus> objectCompletableFuture =
257256
CompletableFuture.supplyAsync(() -> clientJobProxy.waitForJobComplete());
258257

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ public void testExecuteJob() {
120120
ClientJobExecutionEnvironment jobExecutionEnv =
121121
seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
122122
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
123-
TimeUnit.SECONDS.sleep(2);
124123
CompletableFuture<JobStatus> objectCompletableFuture =
125124
CompletableFuture.supplyAsync(
126125
() -> {

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ public void canNotSubmitJobWhenHaveNoWorkerNode() {
160160
ClientJobExecutionEnvironment jobExecutionEnv =
161161
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
162162
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
163-
TimeUnit.SECONDS.sleep(2);
164163
PassiveCompletableFuture<JobResult> jobResultPassiveCompletableFuture =
165164
clientJobProxy.doWaitForJobComplete();
166165
await().atMost(60000, TimeUnit.MILLISECONDS)

0 commit comments

Comments
 (0)