Skip to content

Commit ee96c95

Browse files
committed
[Fix][Zeta] Fix pendingJobMasterMap resource leak
1 parent da06802 commit ee96c95

File tree

5 files changed

+17
-59
lines changed

5 files changed

+17
-59
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,9 +619,6 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws Exception {
619619
+ FileUtils.getFileLineNumberFromDir(
620620
testResources.getLeft())
621621
+ "=================================\n");
622-
JobStatus jobStatus = clientJobProxy.getJobStatus();
623-
System.out.println("++++++++++++++++++++++++++++++++++++++++");
624-
System.out.println(jobStatus);
625622
Assertions.assertTrue(objectCompletableFuture.isDone());
626623
Assertions.assertEquals(
627624
JobStatus.FINISHED, objectCompletableFuture.get());

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public void testMasterNodeActive() {
9595
CoordinatorService coordinatorService =
9696
server2.getCoordinatorService();
9797
Assertions.assertTrue(coordinatorService.isCoordinatorActive());
98-
} catch (Exception e) {
99-
Assertions.assertInstanceOf(SeaTunnelEngineException.class, e);
98+
} catch (SeaTunnelEngineException e) {
99+
Assertions.fail("Should not throw SeaTunnelEngineException here.");
100100
}
101101
});
102102
instance2.shutdown();
@@ -203,6 +203,8 @@ void testCleanupRunningJobStateIMap() {
203203
"batch_fake_to_console.conf",
204204
"test_cleanup_running_job_state_imap");
205205
CoordinatorService coordinatorService = jobInformation.coordinatorService;
206+
IMap<Object, Object> runningJobStateIMap =
207+
coordinatorService.getJobMaster(jobInformation.jobId).getRunningJobStateIMap();
206208

207209
await().atMost(10000, TimeUnit.MILLISECONDS)
208210
.untilAsserted(
@@ -229,6 +231,7 @@ void testCleanupRunningJobStateIMap() {
229231
coordinatorService.getJobMaster(jobInformation.jobId);
230232
// job master should be null
231233
Assertions.assertNull(jobMaster);
234+
Assertions.assertTrue(runningJobStateIMap.isEmpty());
232235
});
233236

234237
jobInformation.coordinatorService.clearCoordinatorService();
@@ -384,13 +387,9 @@ private JobInformation submitJob(String testClassName, String jobConfigFile, Str
384387
Data data =
385388
coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);
386389

387-
try {
388-
coordinatorService
389-
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
390-
.join();
391-
} catch (Throwable e) {
392-
log.error("submit job failed", e);
393-
}
390+
coordinatorService
391+
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
392+
.join();
394393
return new JobInformation(coordinatorServiceTest, coordinatorService, jobId);
395394
}
396395

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceWithCancelPendingJobTest.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.Collections;
4949
import java.util.Map;
5050
import java.util.concurrent.TimeUnit;
51-
import java.util.concurrent.atomic.AtomicReference;
5251

5352
import static org.awaitility.Awaitility.await;
5453

@@ -170,28 +169,16 @@ private JobMaster newJobInstanceWithRunningState(long jobId, boolean restore)
170169
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
171170
voidPassiveCompletableFuture.join();
172171

173-
AtomicReference<JobMaster> jobMasterReference = new AtomicReference<>();
174-
175-
await().atMost(30000, TimeUnit.MILLISECONDS)
176-
.untilAsserted(
177-
() -> {
178-
JobMaster jobMaster =
179-
server.getCoordinatorService().getJobMaster(jobId);
180-
Assertions.assertNotNull(jobMaster);
181-
jobMasterReference.set(jobMaster);
182-
});
172+
JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
183173

184174
// waiting for job status turn to running
185175
await().atMost(120, TimeUnit.SECONDS)
186176
.untilAsserted(
187-
() ->
188-
Assertions.assertEquals(
189-
JobStatus.PENDING,
190-
jobMasterReference.get().getJobStatus()));
177+
() -> Assertions.assertEquals(JobStatus.PENDING, jobMaster.getJobStatus()));
191178

192179
// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
193180
// status become running again
194181
Thread.sleep(5000);
195-
return jobMasterReference.get();
182+
return jobMaster;
196183
}
197184
}

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.jupiter.api.condition.OS;
2828

2929
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicReference;
3130

3231
import static org.awaitility.Awaitility.await;
3332

@@ -42,25 +41,14 @@ public void testCheckpointRestoreToFailEnd() {
4241
long jobId = System.currentTimeMillis();
4342
startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false);
4443

45-
AtomicReference<JobMaster> jobMasterReference = new AtomicReference<>();
46-
await().atMost(60, TimeUnit.SECONDS)
47-
.untilAsserted(
48-
() -> {
49-
JobMaster jobMaster =
50-
server.getCoordinatorService().getJobMaster(jobId);
51-
Assertions.assertNotNull(jobMaster);
52-
jobMasterReference.set(jobMaster);
53-
});
54-
55-
Assertions.assertEquals(
56-
1, jobMasterReference.get().getPhysicalPlan().getPipelineList().size());
44+
JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
45+
Assertions.assertEquals(1, jobMaster.getPhysicalPlan().getPipelineList().size());
5746
await().atMost(240, TimeUnit.SECONDS)
5847
.untilAsserted(
5948
() ->
6049
Assertions.assertEquals(
6150
3,
62-
jobMasterReference
63-
.get()
51+
jobMaster
6452
.getPhysicalPlan()
6553
.getPipelineList()
6654
.get(0)

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.Collections;
5252
import java.util.Map;
5353
import java.util.concurrent.TimeUnit;
54-
import java.util.concurrent.atomic.AtomicReference;
5554

5655
import static org.awaitility.Awaitility.await;
5756

@@ -311,28 +310,16 @@ private JobMaster newJobInstanceWithRunningState(long jobId, boolean restore)
311310
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
312311
voidPassiveCompletableFuture.join();
313312

314-
AtomicReference<JobMaster> jobMasterReference = new AtomicReference<>();
315-
316-
await().atMost(30000, TimeUnit.MILLISECONDS)
317-
.untilAsserted(
318-
() -> {
319-
JobMaster jobMaster =
320-
server.getCoordinatorService().getJobMaster(jobId);
321-
Assertions.assertNotNull(jobMaster);
322-
jobMasterReference.set(jobMaster);
323-
});
313+
JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
324314

325315
// waiting for job status turn to running
326316
await().atMost(120000, TimeUnit.MILLISECONDS)
327317
.untilAsserted(
328-
() ->
329-
Assertions.assertEquals(
330-
JobStatus.RUNNING,
331-
jobMasterReference.get().getJobStatus()));
318+
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
332319

333320
// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
334321
// status become running again
335322
Thread.sleep(5000);
336-
return jobMasterReference.get();
323+
return jobMaster;
337324
}
338325
}

0 commit comments

Comments
 (0)