Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: 时间轮调度任务存取并发问题 对应的issue: #2877 #3373

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.xxl.job.admin.controller;

import com.xxl.job.admin.controller.annotation.PermissionLimit;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.List;

/**
* Created by xuxueli on 17/5/10.
*/
@Controller
@RequestMapping("/api")
public class JobApiController2 {

@Resource
private AdminBiz adminBiz;

@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {

if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken() != null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length() > 0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<>(ReturnT.FAIL_CODE, "The access token is wrong.");
}

// 提供的服务端交互uri 内部委托AdminBizImpl实现
switch (uri) {
case "callback":
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
case "registry":
return adminBiz.registry(GsonTool.fromJson(data, RegistryParam.class));
case "registryRemove":
return adminBiz.registryRemove(GsonTool.fromJson(data, RegistryParam.class));
default:
return new ReturnT<>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.xxl.job.admin.core.complete;

import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;

import java.text.MessageFormat;

/**
* @author xuxueli 2020-10-30 20:43:10
*/
public class XxlJobCompleter2 {

/**
* 完成任务入口 一个任务实例只进入一次
*
* @param xxlJobLog
* @return
*/
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {

// 完成任务 触发子任务等等
finishJob(xxlJobLog);

// text最大64kb 避免长度过长
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg().substring(0, 15000));
}

// 更新执行记录
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

private static void finishJob(XxlJobLog xxlJobLog) {
StringBuilder triggerChildMsg = null;
// 1、执行成功 触发子任务执行
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
if (xxlJobInfo != null && xxlJobInfo.getChildJobId() != null && xxlJobInfo.getChildJobId().trim().length() > 0) {
triggerChildMsg = new StringBuilder("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_child_run") + "<<<<<<<<<<< </span><br>");
// 子任务id用','分割
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
int childJobId = (childJobIds[i] != null && childJobIds[i].trim().length() > 0 && isNumeric(childJobIds[i])) ? Integer.parseInt(childJobIds[i]) : -1;
if (childJobId > 0) {
// 触发执行
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;

// add msg
triggerChildMsg.append(MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
(i + 1),
childJobIds.length,
childJobIds[i],
(triggerChildResult.getCode() == ReturnT.SUCCESS_CODE ? I18nUtil.getString("system_success") : I18nUtil.getString("system_fail")),
triggerChildResult.getMsg()));
} else {
triggerChildMsg.append(MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
(i + 1),
childJobIds.length,
childJobIds[i]));
}
}
}
}

if (triggerChildMsg != null) {
xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg() + triggerChildMsg);
}

// 2、fix_delay trigger next
// on the way

}

private static boolean isNumeric(String str) {
try {
Integer.parseInt(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.xxl.job.admin.core.conf;

import com.xxl.job.admin.core.alarm.JobAlarmer;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.dao.*;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Arrays;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/

@Component
public class XxlJobAdminConfig2 implements InitializingBean, DisposableBean {

private static XxlJobAdminConfig2 adminConfig = null;

public static XxlJobAdminConfig2 getAdminConfig() {
return adminConfig;
}


// ---------------------- XxlJobScheduler ----------------------

private XxlJobScheduler xxlJobScheduler;

@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
// 引出XxlJobScheduler,该类是服务端初始化的关键
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}

@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}

// ---------------------- XxlJobScheduler ----------------------

/**
* 一些服务端配置
*/
@Value("${xxl.job.i18n}")
private String i18n;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${spring.mail.from}")
private String emailFrom;

@Value("${xxl.job.triggerpool.fast.max}")
private int triggerPoolFastMax;

@Value("${xxl.job.triggerpool.slow.max}")
private int triggerPoolSlowMax;

@Value("${xxl.job.logretentiondays}")
private int logretentiondays;

/**
* 将dao和service类在AdminConfig中统一管理
*/
@Resource
private XxlJobLogDao xxlJobLogDao;
@Resource
private XxlJobInfoDao xxlJobInfoDao;
@Resource
private XxlJobRegistryDao xxlJobRegistryDao;
@Resource
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private XxlJobLogReportDao xxlJobLogReportDao;
@Resource
private JavaMailSender mailSender;
@Resource
private DataSource dataSource;
@Resource
private JobAlarmer jobAlarmer;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.xxl.job.admin.core.scheduler;

import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.thread.*;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.ExecutorBizClient;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* @author xuxueli 2018-10-28 00:18:17
*/

public class XxlJobScheduler2 {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler2.class);

public void init() throws Exception {
// 国际化
initI18n();

// 开启JobTrigger 该helper定义了快慢两个线程池用来执行定时任务
JobTriggerPoolHelper.toStart();

// 开启执行器注册相关的任务 其中包含注册或移除执行器的线程池以及心跳监控线程
JobRegistryHelper.getInstance().start();

// 另起线程对失败任务做处理(jobLog表) 尝试进行重试(依赖JobTriggerPoolHelper)以及预警
JobFailMonitorHelper.getInstance().start();

// 对任务执行完成后的处理 即客户端调用callback 会借助该类中的callbackThreadPool进行处理
// 同时另有一个monitor线程对处于运行中过长时间的任务进行处理
JobCompleteHelper.getInstance().start();

// 另起线程收集日志报告 主要用于报告展示 不是重点
JobLogReportHelper.getInstance().start();

// 开启调度 开启一个线程不断拉取待执行的任务 放入时间轮等待执行 时间轮线程扫描时间轮触发执行,最终依赖JobTriggerPoolHelper中的快慢线程池
JobScheduleHelper.getInstance().start();

logger.info(">>>>>>>>> init xxl-job admin success.");
}


public void destroy() throws Exception {

// stop-schedule
JobScheduleHelper.getInstance().toStop();

// admin log report stop
JobLogReportHelper.getInstance().toStop();

// admin lose-monitor stop
JobCompleteHelper.getInstance().toStop();

// admin fail-monitor stop
JobFailMonitorHelper.getInstance().toStop();

// admin registry stop
JobRegistryHelper.getInstance().toStop();

// admin trigger pool stop
JobTriggerPoolHelper.toStop();

}

// ---------------------- I18n ----------------------

private void initI18n() {
for (ExecutorBlockStrategyEnum item : ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}

// ---------------------- executor-client ----------------------
// 为每一个远程地址address 创建一个ExecutorBizClient 存入本地缓存
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<>();

public static ExecutorBiz getExecutorBiz(String address) throws Exception {
if (address == null || address.trim().length() == 0) {
return null;
}

// 加载缓存 没有则new ExecutorBizClient
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}

executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

executorBizRepository.put(address, executorBiz);
return executorBiz;
}
}
Loading