Skip to content

Commit

Permalink
Support init task value (#2724)
Browse files Browse the repository at this point in the history
* Spotless Apply

* Spotless Apply

* support-init-task-value

* Spotless Apply

---------

Co-authored-by: Zzm0809 <[email protected]>
  • Loading branch information
Zzm0809 and Zzm0809 authored Dec 23, 2023
1 parent be7b770 commit 2d54ad7
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public Result<List<Catalogue>> getCatalogueTree() {
dataType = "CatalogueTaskDTO",
dataTypeClass = CatalogueTaskDTO.class)
public Result<Catalogue> createTask(@RequestBody CatalogueTaskDTO catalogueTaskDTO) {
if (catalogueService.checkCatalogueTaskNameIsExist(catalogueTaskDTO.getName())) {
return Result.failed(Status.TASK_IS_EXIST);
}
Catalogue catalogue = catalogueService.saveOrUpdateCatalogueAndTask(catalogueTaskDTO);
if (catalogue.getId() != null) {
return Result.succeed(catalogue, Status.SAVE_SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ public class CatalogueTaskDTO {
dataType = "TaskExtConfig",
notes = "The task's extended configuration in JSON format")
private TaskExtConfig configJson;

@ApiModelProperty(value = "Task", dataType = "TaskDTO", notes = "The task information")
private TaskDTO task;
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,11 @@ public interface CatalogueService extends ISuperService<Catalogue> {
* @return A boolean value indicating whether the operation was successful.
*/
Boolean saveOrUpdateOrRename(Catalogue catalogue);

/**
* Check if the catalogue task name is exist
* @param name catalogue task name
* @return true if the catalogue task name is exist
*/
boolean checkCatalogueTaskNameIsExist(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.dinky.assertion.Asserts.isNull;

import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.data.dto.CatalogueTaskDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.Status;
Expand All @@ -31,6 +32,7 @@
import org.dinky.data.model.job.JobHistory;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.Result;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.mapper.CatalogueMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.CatalogueService;
Expand Down Expand Up @@ -58,6 +60,7 @@

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -86,7 +89,8 @@ public List<Catalogue> getCatalogueTree() {
}

/**
* build catalogue tree
* build catalogue tree
*
* @param catalogueList catalogue list
* @return catalogue tree
*/
Expand Down Expand Up @@ -114,6 +118,7 @@ public List<Catalogue> buildCatalogueTree(List<Catalogue> catalogueList) {

/**
* recursion build catalogue and children
*
* @param list
* @param catalogues
*/
Expand All @@ -140,6 +145,7 @@ private void recursionBuildCatalogueAndChildren(List<Catalogue> list, Catalogue

/**
* Determine whether there are child nodes
*
* @param list
* @param catalogue
* @return
Expand All @@ -150,6 +156,7 @@ private boolean hasChild(List<Catalogue> list, Catalogue catalogue) {

/**
* get child list
*
* @param list
* @param catalogue
* @return
Expand All @@ -171,13 +178,46 @@ public Catalogue findByParentIdAndName(Integer parentId, String name) {
.eq(Catalogue::getName, name));
}

/**
* check catalogue task name is exist
* @param name name
* @return true if exist , otherwise false
*/
@Override
public boolean checkCatalogueTaskNameIsExist(String name) {
return getBaseMapper().exists(new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getName, name));
}

/**
* init some value
* @param catalogueTask {@link CatalogueTaskDTO}
* @return {@link Task}
*/
private Task initTaskValue(CatalogueTaskDTO catalogueTask) {
Task task = new Task();
if (Opt.ofNullable(catalogueTask.getTask()).isPresent()) {
task = catalogueTask.getTask().buildTask();
} else {
task.setStep(JobLifeCycle.DEVELOP.getValue());
task.setEnabled(true);
if (Dialect.isFlinkSql(catalogueTask.getType(), false)) {
task.setType(GatewayType.LOCAL.getLongValue());
task.setParallelism(1);
task.setSavePointStrategy(0); // 0 is disabled
task.setEnvId(-1); // -1 is disabled
task.setAlertGroupId(-1); // -1 is disabled
}
}
return task;
}

@Transactional(rollbackFor = Exception.class)
@Override
public Catalogue saveOrUpdateCatalogueAndTask(CatalogueTaskDTO catalogueTaskDTO) {
Task task = null;
Catalogue catalogue = null;
if (catalogueTaskDTO.getId() == null) {
task = new Task();
task = initTaskValue(catalogueTaskDTO);
catalogue = new Catalogue();
} else {
catalogue = baseMapper.selectById(catalogueTaskDTO.getId());
Expand Down Expand Up @@ -439,10 +479,11 @@ public Result<Void> deleteCatalogueById(Integer catalogueId) {

/**
* <p>
* 1. save catalogue
* 2. save task
* 3. save statement
* 4. rename
* 1. save catalogue
* 2. save task
* 3. save statement
* 4. rename
*
* @param catalogue
* @return
*/
Expand Down
13 changes: 13 additions & 0 deletions dinky-common/src/main/java/org/dinky/config/Dialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ public static boolean isUDF(String value) {
}
}

public static boolean isFlinkSql(String value, boolean includeFlinksqlEnv) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
case FLINK_SQL:
case FLINK_JAR:
return true;
case FLINK_SQL_ENV:
return includeFlinksqlEnv;
default:
return false;
}
}

public static boolean isJarDialect(String value) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public enum Status {
TASK_SQL_EXPLAN_FAILED(12007, "task.sql.explain.failed"),
TASK_UPDATE_FAILED(12008, "task.update.failed"),
TASK_IS_ONLINE(12009, "task.is.online"),
TASK_IS_EXIST(12010, "task.is.existed"),

/**
* alert instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ failed=Failed
added.failed=Added Failed
task.not.exist=Task Not Exist
task.is.online= Task is online, modification is prohibited
task.is.existed=Task is existed
cluster.instance.deploy=Deploy Success
clear.failed=Clear Failed
rename.success=Rename Successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ failed=获取失败
added.failed=新增失败
task.not.exist=任务不存在
task.is.online=任务已上线,禁止修改
task.is.existed=作业已存在
cluster.instance.deploy=部署完成
clear.failed=清除失败
rename.success=重命名成功
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import { FormContextValue } from '@/components/Context/FormContext';
import { JOB_TYPE } from '@/pages/DataStudio/LeftContainer/Project/constants';
import { isUDF } from '@/pages/DataStudio/LeftContainer/Project/function';
import { isFlinkJob, isUDF } from '@/pages/DataStudio/LeftContainer/Project/function';
import { queryDataByParams } from '@/services/BusinessCrud';
import { RUN_MODE } from '@/services/constants';
import { API_CONSTANTS } from '@/services/endpoints';
import { Catalogue } from '@/types/Studio/data';
import { l } from '@/utils/intl';
Expand Down Expand Up @@ -109,7 +110,21 @@ const JobModal: React.FC<JobModalProps> = (props) => {
const { selectKeys } = formData.configJson.udfConfig;
formData.configJson.udfConfig.templateId = selectKeys[selectKeys.length - 1];
}
onSubmit({ ...values, ...formData } as Catalogue);
// if this type is flink job, init task value and submit
if (isFlinkJob(formData.type ?? '')) {
const initTaskValue = {
savePointStrategy: -1, // -1 is disabled
parallelism: 1, // default parallelism
envId: -1, // -1 is disabled
step: 1, // default step is develop
alertGroupId: -1, // -1 is disabled
type: RUN_MODE.LOCAL, // default run mode is local
dialect: formData.type
};
onSubmit({ ...values, ...formData, task: initTaskValue } as Catalogue);
} else {
onSubmit({ ...values, ...formData } as Catalogue);
}
};

/**
Expand Down
2 changes: 0 additions & 2 deletions dinky-web/src/types/Studio/data.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ export type Task = {
clusterId: number;
clusterConfigurationId: number;
databaseId: number;
jarId: number;
envId: number;
alertGroupId: number;
note: string;
Expand All @@ -78,7 +77,6 @@ export type Task = {
savePoints: SavePoint[];
configJson: TaskExtConfig;
path: string;
jarName: string;
clusterConfigurationName: string;
databaseName: string;
envName: string;
Expand Down

0 comments on commit 2d54ad7

Please sign in to comment.