Skip to content

Commit

Permalink
完善动态任务添加
Browse files Browse the repository at this point in the history
  • Loading branch information
尹吉欢 committed Mar 25, 2018
1 parent db71628 commit 6668561
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@SpringBootApplication
@EnableElasticJob
//开启动态任务添加API
@ComponentScan(basePackages = {"com.cxytiandi.elasticjob.dynamic"})
@ComponentScan(basePackages = {"com.cxytiandi"})
public class JobApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public class Job {
* 作业名称
* @return
*/
private String name;
private String jobName;

/**
* 作业类型(SimpleJob,DataflowJob,ScriptJob
* 作业类型(SIMPLE,DATAFLOW,SCRIPT
*/
private String jobTypeName;
private String jobType;

/**
* 任务类路径
Expand Down Expand Up @@ -180,15 +180,7 @@ public class Job {
private long completedTimeoutMilliseconds = Long.MAX_VALUE;

/**
* 自定义异常处理类
* @return
*/
private String jobExceptionHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

/**
* 自定义业务处理线程池
* @return
* 扩展属性
*/
private String executorServiceHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";

private JobProperties jobProperties = new JobProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.cxytiandi.elasticjob.dynamic.bean;

import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Getter;
@Getter
public class JobProperties {

/**
* 自定义异常处理类
* @return
*/
@JsonProperty("job_exception_handler")
private String jobExceptionHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

/**
* 自定义业务处理线程池
* @return
*/
@JsonProperty("executor_service_handler")
private String executorServiceHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -28,12 +29,17 @@ public class JobController {
@Autowired
private JobService jobService;

/**
* 添加动态任务(适用于脚本逻辑已存在的情况,只是动态添加了触发的时间)
* @param job 任务信息
* @return
*/
@PostMapping("/job")
public Object addJob(@RequestBody Job job) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", true);

if (!StringUtils.hasText(job.getName())) {
if (!StringUtils.hasText(job.getJobName())) {
result.put("status", false);
result.put("message", "name not null");
return result;
Expand All @@ -45,13 +51,13 @@ public Object addJob(@RequestBody Job job) {
return result;
}

if (!StringUtils.hasText(job.getJobTypeName())) {
if (!StringUtils.hasText(job.getJobType())) {
result.put("status", false);
result.put("message", "jobTypeName not null");
result.put("message", "getJobType not null");
return result;
}

if ("ScriptJob".equals(job.getJobTypeName())) {
if ("ScriptJob".equals(job.getJobType())) {
if (!StringUtils.hasText(job.getScriptCommandLine())) {
result.put("status", false);
result.put("message", "scriptCommandLine not null");
Expand All @@ -68,11 +74,27 @@ public Object addJob(@RequestBody Job job) {
try {
jobService.addJob(job);
} catch (Exception e) {
e.printStackTrace();
result.put("status", false);
result.put("message", e.getMessage());
}
return result;
}

/**
* 删除动态注册的任务(只删除注册中心中的任务信息)
* @param jobName 任务名称
* @throws Exception
*/
@GetMapping("/job/remove")
public Object removeJob(String jobName) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", true);
try {
jobService.removeJob(jobName);
} catch (Exception e) {
result.put("status", false);
result.put("message", e.getMessage());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -12,7 +18,9 @@
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import com.cxytiandi.elasticjob.dynamic.bean.Job;
import com.cxytiandi.elasticjob.dynamic.util.JsonUtils;
import com.cxytiandi.elasticjob.parser.JobConfParser;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
Expand All @@ -37,32 +45,31 @@ public class JobService {
private ApplicationContext ctx;

public void addJob(Job job) {
System.out.println(ctx);
// 核心配置
JobCoreConfiguration coreConfig =
JobCoreConfiguration.newBuilder(job.getName(), job.getCron(), job.getShardingTotalCount())
JobCoreConfiguration.newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount())
.shardingItemParameters(job.getShardingItemParameters())
.description(job.getDescription())
.failover(job.isFailover())
.jobParameter(job.getJobParameter())
.misfire(job.isMisfire())
.jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobExceptionHandler())
.jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getExecutorServiceHandler())
.jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().getJobExceptionHandler())
.jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().getExecutorServiceHandler())
.build();

// 不同类型的任务配置处理
LiteJobConfiguration jobConfig = null;
JobTypeConfiguration typeConfig = null;
String jobTypeName = job.getJobTypeName();
if (jobTypeName.equals("SimpleJob")) {
String jobType = job.getJobType();
if (jobType.equals("SIMPLE")) {
typeConfig = new SimpleJobConfiguration(coreConfig, job.getJobClass());
}

if (jobTypeName.equals("DataflowJob")) {
if (jobType.equals("DATAFLOW")) {
typeConfig = new DataflowJobConfiguration(coreConfig, job.getJobClass(), job.isStreamingProcess());
}

if (jobTypeName.equals("ScriptJob")) {
if (jobType.equals("SCRIPT")) {
typeConfig = new ScriptJobConfiguration(coreConfig, job.getScriptCommandLine());
}

Expand All @@ -82,7 +89,7 @@ public void addJob(Job job) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
if ("ScriptJob".equals(jobTypeName)) {
if ("SCRIPT".equals(jobType)) {
factory.addConstructorArgValue(null);
} else {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(job.getJobClass());
Expand All @@ -103,7 +110,7 @@ public void addJob(Job job) {
defaultListableBeanFactory.registerBeanDefinition("SpringJobScheduler", factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean("SpringJobScheduler");
springJobScheduler.init();
logger.info("【" + job.getName() + "】\t" + job.getJobClass() + "\tinit success");
logger.info("【" + job.getJobName() + "】\t" + job.getJobClass() + "\tinit success");
}

private List<BeanDefinition> getTargetElasticJobListeners(Job job) {
Expand All @@ -128,4 +135,39 @@ private List<BeanDefinition> getTargetElasticJobListeners(Job job) {
}
return result;
}


public void removeJob(String jobName) throws Exception {
CuratorFramework client = zookeeperRegistryCenter.getClient();
client.delete().deletingChildrenIfNeeded().forPath("/" + jobName);
}

/**
* 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
*/
public void monitorJobRegister() {
CuratorFramework client = zookeeperRegistryCenter.getClient();
@SuppressWarnings("resource")
PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
String config = new String(client.getData().forPath(data.getPath() + "/config"));
Job job = JsonUtils.toBean(Job.class, config);
addJob(job);
break;
default:
break;
}
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
try {
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.cxytiandi.elasticjob.dynamic.util;

import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Json 工具类
* @author yinjihuan
*
*/
public class JsonUtils {
private static ObjectMapper mapper = new ObjectMapper();

public static String toString(Object obj){
return toJson(obj);
}

public static String toJson(Object obj){
try{
StringWriter writer = new StringWriter();
mapper.writeValue(writer, obj);
return writer.toString();
}catch(Exception e){
throw new RuntimeException("序列化对象【"+obj+"】时出错", e);
}
}

public static <T> T toBean(Class<T> entityClass, String jsonString){
try {
return mapper.readValue(jsonString, entityClass);
} catch (Exception e) {
throw new RuntimeException("JSON【"+jsonString+"】转对象时出错", e);
}
}

/**
* 用于对象通过其他工具已转为JSON的字符形式,这里不需要再加上引号
* @param obj
* @param isObject
*/
public static String getJsonSuccess(String obj, boolean isObject){
String jsonString = null;
if(obj == null){
jsonString = "{\"success\":true}";
}else{
jsonString = "{\"success\":true,\"data\":"+obj+"}";
}
return jsonString;
}

public static String getJsonSuccess(Object obj){
return getJsonSuccess(obj, null);
}

public static String getJsonSuccess(Object obj, String message) {
if(obj == null){
return "{\"success\":true,\"message\":\""+message+"\"}";
}else{
try{
Map<String, Object> map = new HashMap<String, Object>();
map.put("success", true);
return "{\"success\":true,"+toString(obj)+",\"message\":\""+message+"\"}";
}catch(Exception e){
throw new RuntimeException("序列化对象【"+obj+"】时出错", e);
}
}
}

public static String getJsonError(Object obj){
return getJsonError(obj, null);
}

public static String getJsonError(Object obj, String message) {
if(obj == null){
return "{\"success\":false,\"message\":\""+message+"\"}";
}else{
try{
obj = parseIfException(obj);
return "{\"success\":false,\"data\":"+toString(obj)+",\"message\":\""+message+"\"}";
}catch(Exception e){
throw new RuntimeException("序列化对象【"+obj+"】时出错", e);
}
}
}

public static Object parseIfException(Object obj){
if(obj instanceof Exception){
return getErrorMessage((Exception) obj, null);
}
return obj;
}

public static String getErrorMessage(Exception e, String defaultMessage){
return defaultMessage != null ? defaultMessage : null;
}

public static ObjectMapper getMapper() {
return mapper;
}
}
Loading

0 comments on commit 6668561

Please sign in to comment.