Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

同一类上配置多个@ElasticJobConf 注解 #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.cxytiandi.elasticjob.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.*;

import org.springframework.stereotype.Component;

/**
Expand All @@ -17,6 +15,7 @@
*
*/
@Component
@Repeatable(ElasticJobConfs.class)
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConf {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.cxytiandi.elasticjob.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author heyc
* @date 2019/5/16 15:30
*/
@Component
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfs {

/**
* 任务调度调度配置
* @return
*/
ElasticJobConf[] value();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import java.util.List;
import java.util.Map;

import com.cxytiandi.elasticjob.annotation.ElasticJobConfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -53,112 +55,123 @@ public class JobConfParser implements ApplicationContextAware {

@Autowired(required=false)
private JobService jobService;


@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
environment = ctx.getEnvironment();
// 注册 @ElasticJobConf 注解
Map<String, Object> beanMap = ctx.getBeansWithAnnotation(ElasticJobConf.class);
for (Object confBean : beanMap.values()) {
Class<?> clz = confBean.getClass();
String jobTypeName = confBean.getClass().getInterfaces()[0].getSimpleName();
ElasticJobConf conf = clz.getAnnotation(ElasticJobConf.class);

String jobClass = clz.getName();
String jobName = conf.name();
String cron = getEnvironmentStringValue(jobName, JobAttributeTag.CRON, conf.cron());
String shardingItemParameters = getEnvironmentStringValue(jobName, JobAttributeTag.SHARDING_ITEM_PARAMETERS, conf.shardingItemParameters());
String description = getEnvironmentStringValue(jobName, JobAttributeTag.DESCRIPTION, conf.description());
String jobParameter = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_PARAMETER, conf.jobParameter());
String jobExceptionHandler = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_EXCEPTION_HANDLER, conf.jobExceptionHandler());
String executorServiceHandler = getEnvironmentStringValue(jobName, JobAttributeTag.EXECUTOR_SERVICE_HANDLER, conf.executorServiceHandler());

String jobShardingStrategyClass = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_SHARDING_STRATEGY_CLASS, conf.jobShardingStrategyClass());
String eventTraceRdbDataSource = getEnvironmentStringValue(jobName, JobAttributeTag.EVENT_TRACE_RDB_DATA_SOURCE, conf.eventTraceRdbDataSource());
String scriptCommandLine = getEnvironmentStringValue(jobName, JobAttributeTag.SCRIPT_COMMAND_LINE, conf.scriptCommandLine());

boolean failover = getEnvironmentBooleanValue(jobName, JobAttributeTag.FAILOVER, conf.failover());
boolean misfire = getEnvironmentBooleanValue(jobName, JobAttributeTag.MISFIRE, conf.misfire());
boolean overwrite = getEnvironmentBooleanValue(jobName, JobAttributeTag.OVERWRITE, conf.overwrite());
boolean disabled = getEnvironmentBooleanValue(jobName, JobAttributeTag.DISABLED, conf.disabled());
boolean monitorExecution = getEnvironmentBooleanValue(jobName, JobAttributeTag.MONITOR_EXECUTION, conf.monitorExecution());
boolean streamingProcess = getEnvironmentBooleanValue(jobName, JobAttributeTag.STREAMING_PROCESS, conf.streamingProcess());

int shardingTotalCount = getEnvironmentIntValue(jobName, JobAttributeTag.SHARDING_TOTAL_COUNT, conf.shardingTotalCount());
int monitorPort = getEnvironmentIntValue(jobName, JobAttributeTag.MONITOR_PORT, conf.monitorPort());
int maxTimeDiffSeconds = getEnvironmentIntValue(jobName, JobAttributeTag.MAX_TIME_DIFF_SECONDS, conf.maxTimeDiffSeconds());
int reconcileIntervalMinutes = getEnvironmentIntValue(jobName, JobAttributeTag.RECONCILE_INTERVAL_MINUTES, conf.reconcileIntervalMinutes());

// 核心配置
JobCoreConfiguration coreConfig =
JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
.jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
.build();

// 不同类型的任务配置处理
LiteJobConfiguration jobConfig = null;
JobTypeConfiguration typeConfig = null;
if (jobTypeName.equals("SimpleJob")) {
typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
}

if (jobTypeName.equals("DataflowJob")) {
typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
}

if (jobTypeName.equals("ScriptJob")) {
typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
}

jobConfig = LiteJobConfiguration.newBuilder(typeConfig)
.overwrite(overwrite)
.disabled(disabled)
.monitorPort(monitorPort)
.monitorExecution(monitorExecution)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.jobShardingStrategyClass(jobShardingStrategyClass)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.build();

List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(conf);

// 构建SpringJobScheduler对象来初始化任务
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
if ("ScriptJob".equals(jobTypeName)) {
factory.addConstructorArgValue(null);
} else {
factory.addConstructorArgValue(confBean);
}
factory.addConstructorArgValue(zookeeperRegistryCenter);
factory.addConstructorArgValue(jobConfig);

// 任务执行日志数据源,以名称获取
if (StringUtils.hasText(eventTraceRdbDataSource)) {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
ElasticJobConf conf = confBean.getClass().getAnnotation(ElasticJobConf.class);
registerSpringJobScheduler(confBean, conf, ctx);
}
// 注册 @ElasticJobConfs 注解
beanMap = ctx.getBeansWithAnnotation(ElasticJobConfs.class);
for (Object confBean : beanMap.values()) {
ElasticJobConfs jobConfs = confBean.getClass().getAnnotation(ElasticJobConfs.class);
for (ElasticJobConf conf : jobConfs.value()) {
registerSpringJobScheduler(confBean, conf, ctx);
}

factory.addConstructorArgValue(elasticJobListeners);
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory)ctx.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerBeanDefinition(jobName+"SpringJobScheduler", factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean(jobName+"SpringJobScheduler");
springJobScheduler.init();
logger.info("【" + jobName + "】\t" + jobClass + "\tinit success");
}

//开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
if (jobService != null) {
jobService.monitorJobRegister();
}

}

private void registerSpringJobScheduler(Object confBean, ElasticJobConf conf, ApplicationContext ctx) {
String jobTypeName = confBean.getClass().getInterfaces()[0].getSimpleName();
String jobClass = confBean.getClass().getName();
String jobName = conf.name();
String cron = getEnvironmentStringValue(jobName, JobAttributeTag.CRON, conf.cron());
String shardingItemParameters = getEnvironmentStringValue(jobName, JobAttributeTag.SHARDING_ITEM_PARAMETERS, conf.shardingItemParameters());
String description = getEnvironmentStringValue(jobName, JobAttributeTag.DESCRIPTION, conf.description());
String jobParameter = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_PARAMETER, conf.jobParameter());
String jobExceptionHandler = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_EXCEPTION_HANDLER, conf.jobExceptionHandler());
String executorServiceHandler = getEnvironmentStringValue(jobName, JobAttributeTag.EXECUTOR_SERVICE_HANDLER, conf.executorServiceHandler());

String jobShardingStrategyClass = getEnvironmentStringValue(jobName, JobAttributeTag.JOB_SHARDING_STRATEGY_CLASS, conf.jobShardingStrategyClass());
String eventTraceRdbDataSource = getEnvironmentStringValue(jobName, JobAttributeTag.EVENT_TRACE_RDB_DATA_SOURCE, conf.eventTraceRdbDataSource());
String scriptCommandLine = getEnvironmentStringValue(jobName, JobAttributeTag.SCRIPT_COMMAND_LINE, conf.scriptCommandLine());

boolean failover = getEnvironmentBooleanValue(jobName, JobAttributeTag.FAILOVER, conf.failover());
boolean misfire = getEnvironmentBooleanValue(jobName, JobAttributeTag.MISFIRE, conf.misfire());
boolean overwrite = getEnvironmentBooleanValue(jobName, JobAttributeTag.OVERWRITE, conf.overwrite());
boolean disabled = getEnvironmentBooleanValue(jobName, JobAttributeTag.DISABLED, conf.disabled());
boolean monitorExecution = getEnvironmentBooleanValue(jobName, JobAttributeTag.MONITOR_EXECUTION, conf.monitorExecution());
boolean streamingProcess = getEnvironmentBooleanValue(jobName, JobAttributeTag.STREAMING_PROCESS, conf.streamingProcess());

int shardingTotalCount = getEnvironmentIntValue(jobName, JobAttributeTag.SHARDING_TOTAL_COUNT, conf.shardingTotalCount());
int monitorPort = getEnvironmentIntValue(jobName, JobAttributeTag.MONITOR_PORT, conf.monitorPort());
int maxTimeDiffSeconds = getEnvironmentIntValue(jobName, JobAttributeTag.MAX_TIME_DIFF_SECONDS, conf.maxTimeDiffSeconds());
int reconcileIntervalMinutes = getEnvironmentIntValue(jobName, JobAttributeTag.RECONCILE_INTERVAL_MINUTES, conf.reconcileIntervalMinutes());

// 核心配置
JobCoreConfiguration coreConfig =
JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.jobProperties(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
.jobProperties(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
.build();

// 不同类型的任务配置处理
LiteJobConfiguration jobConfig = null;
JobTypeConfiguration typeConfig = null;
if (jobTypeName.equals("SimpleJob")) {
typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
}

if (jobTypeName.equals("DataflowJob")) {
typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
}

if (jobTypeName.equals("ScriptJob")) {
typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
}

jobConfig = LiteJobConfiguration.newBuilder(typeConfig)
.overwrite(overwrite)
.disabled(disabled)
.monitorPort(monitorPort)
.monitorExecution(monitorExecution)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.jobShardingStrategyClass(jobShardingStrategyClass)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.build();

List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(conf);

// 构建SpringJobScheduler对象来初始化任务
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
if ("ScriptJob".equals(jobTypeName)) {
factory.addConstructorArgValue(null);
} else {
factory.addConstructorArgValue(confBean);
}
factory.addConstructorArgValue(zookeeperRegistryCenter);
factory.addConstructorArgValue(jobConfig);

// 任务执行日志数据源,以名称获取
if (StringUtils.hasText(eventTraceRdbDataSource)) {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
}

factory.addConstructorArgValue(elasticJobListeners);
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory)ctx.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerBeanDefinition(jobName+"SpringJobScheduler", factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean(jobName+"SpringJobScheduler");
springJobScheduler.init();
logger.info("【" + jobName + "】\t" + jobClass + "\tinit success");
}

private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConf conf) {
List<BeanDefinition> result = new ManagedList<BeanDefinition>(2);
String listeners = getEnvironmentStringValue(conf.name(), JobAttributeTag.LISTENER, conf.listener());
Expand Down