From 66685615d7bdc5ebbba2cb0fbbf9bead16051406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=B9=E5=90=89=E6=AC=A2?= Date: Sun, 25 Mar 2018 16:20:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=8A=A8=E6=80=81=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/cxytiandi/job/JobApplication.java | 2 +- .../elasticjob/dynamic/bean/Job.java | 18 +--- .../dynamic/bean/JobProperties.java | 23 ++++ .../dynamic/controller/JobController.java | 32 +++++- .../dynamic/service/JobService.java | 62 +++++++++-- .../elasticjob/dynamic/util/JsonUtils.java | 101 ++++++++++++++++++ .../elasticjob/parser/JobConfParser.java | 11 +- 7 files changed, 219 insertions(+), 30 deletions(-) create mode 100644 spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/JobProperties.java create mode 100644 spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/util/JsonUtils.java diff --git a/elastic-job-spring-boot-example/src/main/java/com/cxytiandi/job/JobApplication.java b/elastic-job-spring-boot-example/src/main/java/com/cxytiandi/job/JobApplication.java index f5b47af..c448ea1 100644 --- a/elastic-job-spring-boot-example/src/main/java/com/cxytiandi/job/JobApplication.java +++ b/elastic-job-spring-boot-example/src/main/java/com/cxytiandi/job/JobApplication.java @@ -18,7 +18,7 @@ @SpringBootApplication @EnableElasticJob //开启动态任务添加API -@ComponentScan(basePackages = {"com.cxytiandi.elasticjob.dynamic"}) +@ComponentScan(basePackages = {"com.cxytiandi"}) public class JobApplication { public static void main(String[] args) { diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java index e54e91d..44a0be6 100644 --- a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java @@ -11,12 +11,12 @@ public class Job { * 作业名称 * @return */ - private String name; + private String jobName; /** - * 作业类型(SimpleJob,DataflowJob,ScriptJob) + * 作业类型(SIMPLE,DATAFLOW,SCRIPT) */ - private String jobTypeName; + private String jobType; /** * 任务类路径 @@ -180,15 +180,7 @@ public class Job { private long completedTimeoutMilliseconds = Long.MAX_VALUE; /** - * 自定义异常处理类 - * @return - */ - private String jobExceptionHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler"; - - /** - * 自定义业务处理线程池 - * @return + * 扩展属性 */ - private String executorServiceHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"; - + private JobProperties jobProperties = new JobProperties(); } diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/JobProperties.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/JobProperties.java new file mode 100644 index 0000000..62508ed --- /dev/null +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/JobProperties.java @@ -0,0 +1,23 @@ +package com.cxytiandi.elasticjob.dynamic.bean; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Getter; +@Getter +public class JobProperties { + + /** + * 自定义异常处理类 + * @return + */ + @JsonProperty("job_exception_handler") + private String jobExceptionHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler"; + + /** + * 自定义业务处理线程池 + * @return + */ + @JsonProperty("executor_service_handler") + private String executorServiceHandler = "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"; + +} diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/controller/JobController.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/controller/JobController.java index 9fdcc90..637324f 100644 --- a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/controller/JobController.java +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/controller/JobController.java @@ -5,6 +5,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -28,12 +29,17 @@ public class JobController { @Autowired private JobService jobService; + /** + * 添加动态任务(适用于脚本逻辑已存在的情况,只是动态添加了触发的时间) + * @param job 任务信息 + * @return + */ @PostMapping("/job") public Object addJob(@RequestBody Job job) { Map result = new HashMap(); result.put("status", true); - if (!StringUtils.hasText(job.getName())) { + if (!StringUtils.hasText(job.getJobName())) { result.put("status", false); result.put("message", "name not null"); return result; @@ -45,13 +51,13 @@ public Object addJob(@RequestBody Job job) { return result; } - if (!StringUtils.hasText(job.getJobTypeName())) { + if (!StringUtils.hasText(job.getJobType())) { result.put("status", false); - result.put("message", "jobTypeName not null"); + result.put("message", "getJobType not null"); return result; } - if ("ScriptJob".equals(job.getJobTypeName())) { + if ("ScriptJob".equals(job.getJobType())) { if (!StringUtils.hasText(job.getScriptCommandLine())) { result.put("status", false); result.put("message", "scriptCommandLine not null"); @@ -68,11 +74,27 @@ public Object addJob(@RequestBody Job job) { try { jobService.addJob(job); } catch (Exception e) { - e.printStackTrace(); result.put("status", false); result.put("message", e.getMessage()); } return result; } + /** + * 删除动态注册的任务(只删除注册中心中的任务信息) + * @param jobName 任务名称 + * @throws Exception + */ + @GetMapping("/job/remove") + public Object removeJob(String jobName) { + Map result = new HashMap(); + result.put("status", true); + try { + jobService.removeJob(jobName); + } catch (Exception e) { + result.put("status", false); + result.put("message", e.getMessage()); + } + return result; + } } diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/service/JobService.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/service/JobService.java index c8dcff9..e3c6c79 100644 --- a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/service/JobService.java +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/service/JobService.java @@ -2,6 +2,12 @@ import java.util.List; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -12,7 +18,9 @@ import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; + import com.cxytiandi.elasticjob.dynamic.bean.Job; +import com.cxytiandi.elasticjob.dynamic.util.JsonUtils; import com.cxytiandi.elasticjob.parser.JobConfParser; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobTypeConfiguration; @@ -37,32 +45,31 @@ public class JobService { private ApplicationContext ctx; public void addJob(Job job) { - System.out.println(ctx); // 核心配置 JobCoreConfiguration coreConfig = - JobCoreConfiguration.newBuilder(job.getName(), job.getCron(), job.getShardingTotalCount()) + JobCoreConfiguration.newBuilder(job.getJobName(), job.getCron(), job.getShardingTotalCount()) .shardingItemParameters(job.getShardingItemParameters()) .description(job.getDescription()) .failover(job.isFailover()) .jobParameter(job.getJobParameter()) .misfire(job.isMisfire()) - .jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobExceptionHandler()) - .jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getExecutorServiceHandler()) + .jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), job.getJobProperties().getJobExceptionHandler()) + .jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), job.getJobProperties().getExecutorServiceHandler()) .build(); // 不同类型的任务配置处理 LiteJobConfiguration jobConfig = null; JobTypeConfiguration typeConfig = null; - String jobTypeName = job.getJobTypeName(); - if (jobTypeName.equals("SimpleJob")) { + String jobType = job.getJobType(); + if (jobType.equals("SIMPLE")) { typeConfig = new SimpleJobConfiguration(coreConfig, job.getJobClass()); } - if (jobTypeName.equals("DataflowJob")) { + if (jobType.equals("DATAFLOW")) { typeConfig = new DataflowJobConfiguration(coreConfig, job.getJobClass(), job.isStreamingProcess()); } - if (jobTypeName.equals("ScriptJob")) { + if (jobType.equals("SCRIPT")) { typeConfig = new ScriptJobConfiguration(coreConfig, job.getScriptCommandLine()); } @@ -82,7 +89,7 @@ public void addJob(Job job) { BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class); factory.setInitMethodName("init"); factory.setScope(BeanDefinition.SCOPE_PROTOTYPE); - if ("ScriptJob".equals(jobTypeName)) { + if ("SCRIPT".equals(jobType)) { factory.addConstructorArgValue(null); } else { BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(job.getJobClass()); @@ -103,7 +110,7 @@ public void addJob(Job job) { defaultListableBeanFactory.registerBeanDefinition("SpringJobScheduler", factory.getBeanDefinition()); SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean("SpringJobScheduler"); springJobScheduler.init(); - logger.info("【" + job.getName() + "】\t" + job.getJobClass() + "\tinit success"); + logger.info("【" + job.getJobName() + "】\t" + job.getJobClass() + "\tinit success"); } private List getTargetElasticJobListeners(Job job) { @@ -128,4 +135,39 @@ private List getTargetElasticJobListeners(Job job) { } return result; } + + + public void removeJob(String jobName) throws Exception { + CuratorFramework client = zookeeperRegistryCenter.getClient(); + client.delete().deletingChildrenIfNeeded().forPath("/" + jobName); + } + + /** + * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务 + */ + public void monitorJobRegister() { + CuratorFramework client = zookeeperRegistryCenter.getClient(); + @SuppressWarnings("resource") + PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true); + PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + ChildData data = event.getData(); + switch (event.getType()) { + case CHILD_ADDED: + String config = new String(client.getData().forPath(data.getPath() + "/config")); + Job job = JsonUtils.toBean(Job.class, config); + addJob(job); + break; + default: + break; + } + } + }; + childrenCache.getListenable().addListener(childrenCacheListener); + try { + childrenCache.start(StartMode.POST_INITIALIZED_EVENT); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/util/JsonUtils.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/util/JsonUtils.java new file mode 100644 index 0000000..ec788ed --- /dev/null +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/util/JsonUtils.java @@ -0,0 +1,101 @@ +package com.cxytiandi.elasticjob.dynamic.util; + +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; +/** + * Json 工具类 + * @author yinjihuan + * + */ +public class JsonUtils { + private static ObjectMapper mapper = new ObjectMapper(); + + public static String toString(Object obj){ + return toJson(obj); + } + + public static String toJson(Object obj){ + try{ + StringWriter writer = new StringWriter(); + mapper.writeValue(writer, obj); + return writer.toString(); + }catch(Exception e){ + throw new RuntimeException("序列化对象【"+obj+"】时出错", e); + } + } + + public static T toBean(Class entityClass, String jsonString){ + try { + return mapper.readValue(jsonString, entityClass); + } catch (Exception e) { + throw new RuntimeException("JSON【"+jsonString+"】转对象时出错", e); + } + } + + /** + * 用于对象通过其他工具已转为JSON的字符形式,这里不需要再加上引号 + * @param obj + * @param isObject + */ + public static String getJsonSuccess(String obj, boolean isObject){ + String jsonString = null; + if(obj == null){ + jsonString = "{\"success\":true}"; + }else{ + jsonString = "{\"success\":true,\"data\":"+obj+"}"; + } + return jsonString; + } + + public static String getJsonSuccess(Object obj){ + return getJsonSuccess(obj, null); + } + + public static String getJsonSuccess(Object obj, String message) { + if(obj == null){ + return "{\"success\":true,\"message\":\""+message+"\"}"; + }else{ + try{ + Map map = new HashMap(); + map.put("success", true); + return "{\"success\":true,"+toString(obj)+",\"message\":\""+message+"\"}"; + }catch(Exception e){ + throw new RuntimeException("序列化对象【"+obj+"】时出错", e); + } + } + } + + public static String getJsonError(Object obj){ + return getJsonError(obj, null); + } + + public static String getJsonError(Object obj, String message) { + if(obj == null){ + return "{\"success\":false,\"message\":\""+message+"\"}"; + }else{ + try{ + obj = parseIfException(obj); + return "{\"success\":false,\"data\":"+toString(obj)+",\"message\":\""+message+"\"}"; + }catch(Exception e){ + throw new RuntimeException("序列化对象【"+obj+"】时出错", e); + } + } + } + + public static Object parseIfException(Object obj){ + if(obj instanceof Exception){ + return getErrorMessage((Exception) obj, null); + } + return obj; + } + + public static String getErrorMessage(Exception e, String defaultMessage){ + return defaultMessage != null ? defaultMessage : null; + } + + public static ObjectMapper getMapper() { + return mapper; + } +} \ No newline at end of file diff --git a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/parser/JobConfParser.java b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/parser/JobConfParser.java index 5254f9e..76bd4c5 100644 --- a/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/parser/JobConfParser.java +++ b/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/parser/JobConfParser.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -21,6 +20,7 @@ import com.cxytiandi.elasticjob.annotation.ElasticJobConf; import com.cxytiandi.elasticjob.base.JobAttributeTag; +import com.cxytiandi.elasticjob.dynamic.service.JobService; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobTypeConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; @@ -53,6 +53,9 @@ public class JobConfParser implements ApplicationContextAware { private Environment environment; + @Autowired(required=false) + private JobService jobService; + public void setApplicationContext(ApplicationContext ctx) throws BeansException { environment = ctx.getEnvironment(); Map beanMap = ctx.getBeansWithAnnotation(ElasticJobConf.class); @@ -151,6 +154,12 @@ public void setApplicationContext(ApplicationContext ctx) throws BeansException springJobScheduler.init(); logger.info("【" + jobName + "】\t" + jobClass + "\tinit success"); } + + //开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务 + if (jobService != null) { + jobService.monitorJobRegister(); + } + } private List getTargetElasticJobListeners(ElasticJobConf conf) {