Skip to content

Commit

Permalink
Merge pull request #226 from felixncheng/master
Browse files Browse the repository at this point in the history
feat: 优化schedule #221
  • Loading branch information
felixncheng authored Oct 9, 2024
2 parents 05f05f6 + e5c348e commit cbff03a
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ data class JobLog(
var triggerCode: Int = ExecutionCodeEnum.INITIALED.code(),
var triggerMsg: String? = null,
var triggerType: Int,
var scheduledFireTime: LocalDateTime,

/**
* handle信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import com.tencent.devops.schedule.pojo.page.BasePageRequest
import java.time.LocalDateTime

class LogQueryParam(
var jobId: String? = null,
var triggerTime: List<String>? = null,
var triggerTimeFrom: LocalDateTime? = null,
var triggerTimeTo: LocalDateTime? = null
): BasePageRequest()
var jobId: String? = null,
var triggerTime: List<String>? = null,
var executionCode: Int? = null,
var triggerCode: Int? = null,
var triggerTimeFrom: LocalDateTime? = null,
var triggerTimeTo: LocalDateTime? = null,
) : BasePageRequest()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ data class TriggerParam(
var logId: String,
@JsonFormat(pattern = DATE_TIME_PATTERN)
var triggerTime: LocalDateTime,
@JsonFormat(pattern = DATE_TIME_PATTERN)
var scheduledFireTime: LocalDateTime,
var broadcastIndex: Int = 0,
var broadcastTotal: Int = 0,
var workerAddress: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ fun TJobLog.convert(): JobLog {
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus,
scheduledFireTime = scheduledFireTime,
)
}

Expand All @@ -116,6 +117,7 @@ fun JobLog.convert(): TJobLog {
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus,
scheduledFireTime = scheduledFireTime,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ data class TJobLog(
var triggerCode: Int,
var triggerMsg: String? = null,
var triggerType: Int,
var scheduledFireTime: LocalDateTime,

/**
* execution信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class MongoJobProvider(
} ?: run {
triggerTimeTo?.let { to -> criteria.and(TJobLog::triggerTime).lte(to) }
}
executionCode?.let { criteria.and(TJobLog::executionCode).isEqualTo(it) }
triggerCode?.let { criteria.and(TJobLog::triggerCode).isEqualTo(it) }
}
val query = Query.query(criteria).with(Sort.by(Sort.Direction.DESC, TJobLog::triggerTime.name))
val total = mongoTemplate.count(query, TJobLog::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.DisposableBean
import org.springframework.beans.factory.InitializingBean
import org.springframework.context.ApplicationEventPublisher
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -108,7 +110,7 @@ class DefaultJobScheduler(
logger.debug("prepare trigger job[{}]", triggerContext)
triggerThreadPool.execute {
val startTime = System.currentTimeMillis()
val triggerTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
val fireTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
try {
val job = triggerContext.job
jobParam?.let { job.jobParam = it }
Expand All @@ -134,10 +136,10 @@ class DefaultJobScheduler(
jobId,
startTime,
System.currentTimeMillis(),
triggerTime,
fireTime,
)
publisher.publishEvent(triggerEvent)
cc.updateLatency(System.currentTimeMillis() - triggerTime)
cc.updateLatency(System.currentTimeMillis() - fireTime)
}
}
}
Expand All @@ -153,7 +155,6 @@ class DefaultJobScheduler(
val triggerContext = JobTriggerContext(
job = job,
fireTime = System.currentTimeMillis(),
prevFireTime = job.lastTriggerTime,
)
trigger(triggerContext, triggerType, retryCount, jobParam, shardingParam)
}
Expand Down Expand Up @@ -181,11 +182,13 @@ class DefaultJobScheduler(
val shardingParam = if (routeStrategy == RouteStrategyEnum.SHARDING_BROADCAST) "$index/$total" else null

// 1. 保存日志
val fireTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
val jobLog = JobLog(
jobId = job.id.orEmpty(),
groupId = group.id.orEmpty(),
triggerType = triggerType.code(),
triggerTime = LocalDateTime.now(),
scheduledFireTime = Instant.ofEpochMilli(fireTime).atZone(ZoneId.systemDefault()).toLocalDateTime(),
)
val logId = jobManager.addJobLog(jobLog)
// 2. 构造trigger param
Expand All @@ -197,6 +200,7 @@ class DefaultJobScheduler(
jobTimeout = job.jobTimeout,
logId = logId,
triggerTime = jobLog.triggerTime,
scheduledFireTime = jobLog.scheduledFireTime,
broadcastIndex = index,
broadcastTotal = total,
updateTime = job.updateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.tencent.devops.schedule.enums.JobModeEnum.SHELL
import com.tencent.devops.schedule.handler.K8sShellHandler
import com.tencent.devops.schedule.handler.ShellHandler
import com.tencent.devops.schedule.k8s.K8sHelper
import com.tencent.devops.schedule.pojo.ScheduleResponse
import com.tencent.devops.schedule.pojo.trigger.TriggerParam
import com.tencent.devops.schedule.thread.JobThread
import com.tencent.devops.schedule.thread.JobThreadGroup
Expand All @@ -18,7 +19,6 @@ import com.tencent.devops.web.util.SpringContextHolder
import org.slf4j.LoggerFactory
import org.springframework.beans.BeansException
import org.springframework.beans.factory.DisposableBean
import java.lang.IllegalStateException
import java.util.concurrent.ConcurrentHashMap

/**
Expand All @@ -34,7 +34,7 @@ class DefaultJobExecutor(
private val threadGroup = JobThreadGroup(workerProperties.executor.threads, serverRpcClient)
private val jobThreadRepository = ConcurrentHashMap<String, JobThread>()

override fun execute(param: TriggerParam) {
override fun execute(param: TriggerParam): ScheduleResponse {
val jobId = param.jobId
val logId = param.logId
logger.debug("prepare to execute job[$jobId], log[$logId]: {}", param)
Expand Down Expand Up @@ -68,7 +68,7 @@ class DefaultJobExecutor(
when (blockStrategy) {
BlockStrategyEnum.DISCARD_LATER -> {
if (jobThread.hasRunningJobs(jobId)) {
throw IllegalStateException("discard task $logId by block strategy[DISCARD_LATER]")
return ScheduleResponse.failed("discard by block strategy")
}
}

Expand All @@ -87,9 +87,11 @@ class DefaultJobExecutor(
}
val task = TriggerTask(jobId, handler, param)
jobThread.pushTriggerQueue(task)
return ScheduleResponse.success()
}

override fun destroy() {
logger.info("Destroying DefaultJobExecutor")
threadGroup.close()
jobThreadRepository.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ data class JobContext(
* 本次任务触发时间
*/
var triggerTime: LocalDateTime,

/**
* 本次任务调度时间
* */
var scheduledFireTime: LocalDateTime,
/**
* 任务更新时间
* */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.devops.schedule.executor

import com.tencent.devops.schedule.pojo.ScheduleResponse
import com.tencent.devops.schedule.pojo.trigger.TriggerParam

/**
Expand All @@ -10,5 +11,5 @@ interface JobExecutor {
* 提交任务
* @param param 任务触发参数
*/
fun execute(param: TriggerParam)
fun execute(param: TriggerParam): ScheduleResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
}

fun toStop() {
logger.info("Stopping $name")
stop.set(true)
}

Expand All @@ -105,7 +104,7 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
if (logId == stopLogId) {
cancelJobs.remove(task.jobId)
}
val result = JobExecutionResult.failed("cancelled by block strategy[COVER_EARLY]")
val result = JobExecutionResult.failed("cancelled by block strategy")
result.logId = logId
submitResult(task.jobId, result)
return false
Expand All @@ -118,6 +117,7 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
jobParamMap = jobParam.readJsonString(),
logId = logId,
triggerTime = triggerTime,
scheduledFireTime = scheduledFireTime,
broadcastIndex = broadcastIndex,
broadcastTotal = broadcastTotal,
source = if (source != null) String(base64Decoder.decode(param.source)) else null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class JobThreadGroup(nThreads: Int, serverRpcClient: ServerRpcClient) : AutoClos
override fun close() {
threads.forEach {
it.toStop()
it.interrupt()
it.join()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class WorkerRpcController(
override fun runJob(@RequestBody param: TriggerParam): ScheduleResponse {
return try {
jobExecutor.execute(param)
ScheduleResponse.success()
} catch (e: Exception) {
logger.error("execute job[$param] error: ${e.message}", e)
ScheduleResponse.failed(e.message.orEmpty())
Expand Down

0 comments on commit cbff03a

Please sign in to comment.