Skip to content

Commit 2d67312

Browse files
authored
Add normal stop (#2727)
* fix log error * Add a normal stop
1 parent acec1e2 commit 2d67312

File tree

8 files changed

+67
-60
lines changed

8 files changed

+67
-60
lines changed

dinky-admin/src/main/java/org/dinky/controller/APIController.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws
9595
@GetMapping("/cancel")
9696
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
9797
@ApiOperation("Cancel Flink Job")
98-
public Result<Boolean> cancel(@RequestParam Integer id) {
99-
return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS);
98+
public Result<Boolean> cancel(
99+
@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
100+
return Result.succeed(
101+
taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint), Status.EXECUTE_SUCCESS);
100102
}
101103

102104
/**

dinky-admin/src/main/java/org/dinky/controller/TaskController.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ public Result<JobResult> debugTask(@RequestBody TaskDTO task) throws Exception {
106106
@GetMapping("/cancel")
107107
@Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
108108
@ApiOperation("Cancel Flink Job")
109-
public Result<Void> cancel(@RequestParam Integer id) {
110-
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id))) {
109+
public Result<Void> cancel(@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
110+
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint)) {
111111
return Result.succeed(Status.EXECUTE_SUCCESS);
112112
} else {
113113
return Result.failed(Status.EXECUTE_FAILED);

dinky-admin/src/main/java/org/dinky/service/TaskService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public interface TaskService extends ISuperService<Task> {
114114
* @param task The {@link TaskDTO} object representing the task to cancel.
115115
* @return true if the task job is successfully cancelled, false otherwise.
116116
*/
117-
boolean cancelTaskJob(TaskDTO task);
117+
boolean cancelTaskJob(TaskDTO task, boolean withSavePoint);
118118

119119
/**
120120
* Get the stream graph of the given task job.

dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,15 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception
367367
if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId())) {
368368
String status = jobInstanceService.getById(task.getJobInstanceId()).getStatus();
369369
if (!JobStatus.isDone(status)) {
370-
cancelTaskJob(task);
370+
cancelTaskJob(task, true);
371371
}
372372
}
373373
return submitTask(
374374
TaskSubmitDto.builder().id(id).savePointPath(savePointPath).build());
375375
}
376376

377377
@Override
378-
public boolean cancelTaskJob(TaskDTO task) {
378+
public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint) {
379379
if (Dialect.isCommonSql(task.getDialect())) {
380380
return true;
381381
}
@@ -385,7 +385,7 @@ public boolean cancelTaskJob(TaskDTO task) {
385385
Assert.notNull(clusterInstance, Status.CLUSTER_NOT_EXIST.getMessage());
386386

387387
JobManager jobManager = JobManager.build(buildJobConfig(task));
388-
return jobManager.cancel(jobInstance.getJid());
388+
return jobManager.cancel(jobInstance.getJid(), withSavePoint);
389389
}
390390

391391
@Override

dinky-core/src/main/java/org/dinky/job/JobManager.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.dinky.context.RowLevelPermissionsContext;
2929
import org.dinky.data.annotations.ProcessStep;
3030
import org.dinky.data.enums.ProcessStepType;
31+
import org.dinky.data.exception.BusException;
3132
import org.dinky.data.model.SystemConfiguration;
3233
import org.dinky.data.result.ErrorResult;
3334
import org.dinky.data.result.ExplainResult;
@@ -378,27 +379,32 @@ public String getJobPlanJson(String statement) {
378379
.getJsonPlan();
379380
}
380381

381-
public boolean cancel(String jobId) {
382+
public boolean cancel(String jobId, boolean withSavePoint) {
382383
if (useGateway && !useRestAPI) {
383384
config.getGatewayConfig()
384385
.setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), null, null));
385386
Gateway.build(config.getGatewayConfig()).savepointJob();
386387
return true;
387-
} else if (useRestAPI) {
388+
} else if (useRestAPI && withSavePoint) {
388389
try {
389390
// Try to savepoint, if it fails, it will stop normally(尝试进行savepoint,如果失败,即普通停止)
390391
savepoint(jobId, SavePointType.CANCEL, null);
391392
return true;
392393
} catch (Exception e) {
393-
return FlinkAPI.build(config.getAddress()).stop(jobId);
394+
log.warn("Stop with savcePoint failed: {}, will try normal rest api stop", e.getMessage());
395+
return cancelNormal(jobId);
394396
}
395397
} else {
396-
try {
397-
return FlinkAPI.build(config.getAddress()).stop(jobId);
398-
} catch (Exception e) {
399-
log.error("停止作业时集群不存在: " + e);
400-
}
401-
return false;
398+
return cancelNormal(jobId);
399+
}
400+
}
401+
402+
public boolean cancelNormal(String jobId) {
403+
try {
404+
return FlinkAPI.build(config.getAddress()).stop(jobId);
405+
} catch (Exception e) {
406+
log.error("stop flink job failed:", e);
407+
throw new BusException(e.getMessage());
402408
}
403409
}
404410

dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx

+15-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
*
1818
*/
1919

20-
import { handleGetOption, handleOption } from '@/services/BusinessCrud';
20+
import {handleGetOption, handleOption} from '@/services/BusinessCrud';
21+
import {API_CONSTANTS} from "@/services/endpoints";
2122

2223
export async function explainSql(title: string, params: any) {
2324
return handleOption('/api/task/explainSql', title, params);
@@ -32,13 +33,22 @@ export async function debugTask(title: string, params: any) {
3233
}
3334

3435
export async function executeSql(title: string, id: number) {
35-
return handleGetOption('/api/task/submitTask', title, { id });
36+
return handleGetOption('/api/task/submitTask', title, {id});
3637
}
3738

38-
export function cancelTask(title: string, id: number) {
39-
return handleGetOption('api/task/cancel', title, { id });
39+
export function cancelTask(title: string, id: number, withSavePoint: boolean = true) {
40+
return handleGetOption(API_CONSTANTS.CANCEL_JOB, title, {id, withSavePoint});
4041
}
4142

43+
export function restartTask(title: string, id: number, isOnLine:boolean) {
44+
return handleGetOption(API_CONSTANTS.RESTART_TASK, title, {id, isOnLine});
45+
}
46+
export function savePointTask(title: string, taskId: number, savePointType:string) {
47+
return handleGetOption(API_CONSTANTS.SAVEPOINT, title, {taskId, savePointType});
48+
}
49+
50+
51+
4252
export function changeTaskLife(title = '', id: number, life: number) {
43-
return handleGetOption('api/task/changeTaskLife', title, { taskId: id, lifeCycle: life });
53+
return handleGetOption('api/task/changeTaskLife', title, {taskId: id, lifeCycle: life});
4454
}

dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx

+26-37
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
*
1818
*/
1919

20-
import { cancelTask } from '@/pages/DataStudio/HeaderContainer/service';
21-
import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants';
22-
import { isStatusDone } from '@/pages/DevOps/function';
23-
import { getData, postAll } from '@/services/api';
24-
import { API_CONSTANTS } from '@/services/endpoints';
25-
import { Jobs } from '@/types/DevOps/data';
26-
import { l } from '@/utils/intl';
27-
import { EllipsisOutlined, RedoOutlined } from '@ant-design/icons';
28-
import { Button, Dropdown, message, Modal, Space } from 'antd';
20+
import {cancelTask, restartTask, savePointTask} from '@/pages/DataStudio/HeaderContainer/service';
21+
import {JOB_LIFE_CYCLE} from '@/pages/DevOps/constants';
22+
import {isStatusDone} from '@/pages/DevOps/function';
23+
import {getData, postAll} from '@/services/api';
24+
import {API_CONSTANTS} from '@/services/endpoints';
25+
import {Jobs} from '@/types/DevOps/data';
26+
import {l} from '@/utils/intl';
27+
import {EllipsisOutlined, RedoOutlined} from '@ant-design/icons';
28+
import {Button, Dropdown, message, Modal, Space} from 'antd';
2929

3030
const operatorType = {
3131
RESTART_JOB: 'restart',
@@ -40,52 +40,37 @@ export type OperatorType = {
4040
refesh: (isForce: boolean) => void;
4141
};
4242
const JobOperator = (props: OperatorType) => {
43-
const { jobDetail, refesh } = props;
43+
const {jobDetail, refesh} = props;
4444
const webUri = `/api/flink/${jobDetail?.history?.jobManagerAddress}/#/job/running/${jobDetail?.instance?.jid}/overview`;
4545

4646
const handleJobOperator = (key: string) => {
4747
Modal.confirm({
48-
title: l('devops.jobinfo.job.key', '', { key: key }),
49-
content: l('devops.jobinfo.job.keyConfirm', '', { key: key }),
48+
title: l('devops.jobinfo.job.key', '', {key: key}),
49+
content: l('devops.jobinfo.job.keyConfirm', '', {key: key}),
5050
okText: l('button.confirm'),
5151
cancelText: l('button.cancel'),
5252
onOk: async () => {
5353
if (key == operatorType.CANCEL_JOB) {
54-
postAll(API_CONSTANTS.CANCEL_JOB, {
55-
clusterId: jobDetail?.clusterInstance?.id,
56-
jobId: jobDetail?.instance?.jid
57-
});
54+
cancelTask('', jobDetail?.instance?.taskId, false);
5855
} else if (key == operatorType.RESTART_JOB) {
59-
getData(API_CONSTANTS.RESTART_TASK, {
60-
id: jobDetail?.instance?.taskId,
61-
isOnLine: jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH
62-
});
56+
restartTask('', jobDetail?.instance?.taskId, jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH)
6357
} else if (key == operatorType.SAVEPOINT_CANCEL) {
64-
getData(API_CONSTANTS.SAVEPOINT, {
65-
taskId: jobDetail?.instance?.taskId,
66-
savePointType: 'cancel'
67-
});
58+
savePointTask('', jobDetail?.instance?.taskId, 'cancel')
6859
} else if (key == operatorType.SAVEPOINT_STOP) {
69-
getData(API_CONSTANTS.SAVEPOINT, {
70-
taskId: jobDetail?.instance?.taskId,
71-
savePointType: 'stop'
72-
});
60+
savePointTask('', jobDetail?.instance?.taskId, 'stop')
7361
} else if (key == operatorType.SAVEPOINT_TRIGGER) {
74-
getData(API_CONSTANTS.SAVEPOINT, {
75-
taskId: jobDetail?.instance?.taskId,
76-
savePointType: 'trigger'
77-
});
78-
} else {
62+
savePointTask('', jobDetail?.instance?.taskId, 'trigger')
63+
} else if (key == operatorType.AUTO_STOP) {
7964
cancelTask('', jobDetail?.instance?.taskId);
8065
}
81-
message.success(l('devops.jobinfo.job.key.success', '', { key: key }));
66+
message.success(l('devops.jobinfo.job.key.success', '', {key: key}));
8267
}
8368
});
8469
};
8570

8671
return (
8772
<Space>
88-
<Button icon={<RedoOutlined />} onClick={() => refesh(true)} />
73+
<Button icon={<RedoOutlined/>} onClick={() => refesh(true)}/>
8974

9075
<Button key='flinkwebui' href={webUri} target={'_blank'}>
9176
FlinkWebUI
@@ -131,12 +116,16 @@ const JobOperator = (props: OperatorType) => {
131116
{
132117
key: operatorType.SAVEPOINT_CANCEL,
133118
label: l('devops.jobinfo.savepoint.cancel')
119+
},
120+
{
121+
key: operatorType.CANCEL_JOB,
122+
label: l('devops.jobinfo.savepoint.canceljob')
134123
}
135124
]
136125
}}
137126
>
138-
<Button key='4' style={{ padding: '0 8px' }}>
139-
<EllipsisOutlined />
127+
<Button key='4' style={{padding: '0 8px'}}>
128+
<EllipsisOutlined/>
140129
</Button>
141130
</Dropdown>
142131
</>

dinky-web/src/services/endpoints.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export enum API_CONSTANTS {
239239
GET_TASKMANAGER_LIST = 'api/jobInstance/getTaskManagerList',
240240
GET_TASKMANAGER_LOG = 'api/jobInstance/getTaskManagerLog',
241241
GET_JOB_METRICS_ITEMS = 'api/jobInstance/getJobMetricsItems',
242-
CANCEL_JOB = '/api/studio/cancel',
242+
CANCEL_JOB = '/api/task/cancel',
243243
// /api/studio/getLineage
244244
STUDIO_GET_LINEAGE = '/api/studio/getLineage',
245245
// /api/jobInstance/getLineage

0 commit comments

Comments
 (0)