forked from shiny/adonis-resque
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.ts
89 lines (85 loc) · 2.3 KB
/
scheduler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import { Scheduler } from 'node-resque'
import { getConnection } from './services/main.js';
import { NodeResqueJob } from './types.js';
import Cron from 'croner'
import ms from 'ms'
/**
* Create a NodeResque Scheduler
* @docs https://github.com/actionhero/node-resque?tab=readme-ov-file#scheduler
* @returns
*/
export function createScheduler() {
return new Scheduler({
connection: getConnection(),
})
}
export type Interval = NodeJS.Timeout | Cron
export async function startJobSchedules(resqueScheduler: Scheduler, jobs: Record<string, NodeResqueJob>): Promise<Interval[]> {
const intervals: Interval[] = []
/**
* check whether is a leader sheduler or not
* @returns boolean isLeader
*/
const isLeader = () => {
return resqueScheduler.leader
}
/**
* Create a croner if job.cron exists
* @param job
* @returns
*/
const createCronerFor = (job: NodeResqueJob['job']) => {
if (job.cron) {
return Cron(job.cron, async () => {
if (isLeader()) {
return await job.enqueue()
}
})
}
}
/**
* let job repeat for every ${job.interval}
* @param job
* @returns
*/
const createRepeaterFor = (job: NodeResqueJob['job']) => {
if (!job.interval) {
return
}
let milliseconds
if (typeof job.interval === 'number') {
milliseconds = job.interval
} else {
milliseconds = ms(job.interval)
}
const intervalId = setInterval(async () => {
if (isLeader()) {
await job.enqueue()
}
}, milliseconds)
return intervalId
}
for (const { job } of Object.values(jobs)) {
const croner = createCronerFor(job)
if (croner) {
intervals.push(croner)
}
const intervalId = createRepeaterFor(job)
if (intervalId) {
intervals.push(intervalId)
}
}
return intervals
}
export function cancelSchedules(intervals?: Interval[]) {
if (!intervals) {
return
}
for(const inteval of intervals) {
if (inteval instanceof Cron) {
inteval.stop()
} else {
clearInterval(inteval)
}
}
}