-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsanji.go
109 lines (90 loc) · 1.92 KB
/
sanji.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package sanji
import (
"log"
"os"
"sync"
"github.com/bitleak/lmstfy/client"
)
var Logger *log.Logger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)
type Luffy struct {
client *client.LmstfyClient
config *Config
started bool
handlers HandlersChain
managers map[string]*manager
access sync.Mutex
}
type PriorityLevel int
const (
PriorityLevelHight = iota
PriorityLevelMid
PriorityLevelLow
)
func New(token string, namespace string, host string, port int) *Luffy {
return NewWithConfig(token, namespace, host, port, 10)
}
func NewWithConfig(token string, namespace string, host string, port int, poll_interval int) *Luffy {
cf := Configure(&Config{
Namespace: namespace,
Token: token,
Host: host,
Port: port,
PollInterval: poll_interval,
})
luffy := &Luffy{
config: cf,
managers: make(map[string]*manager),
handlers: HandlersChain{logger_middleware, recovery_middleware},
}
luffy.client = cf.client
return luffy
}
func (s *Luffy) Use(mids ...JobFunc) {
s.handlers = append(s.handlers, mids...)
}
func (s *Luffy) Process(queue string, job JobFunc, ttr uint32, concurrency int, is_priority bool) imanager {
manager := newManager(s, queue, job, concurrency, is_priority, ttr)
s.managers[queue] = manager
return manager
}
func (s *Luffy) Run() {
s.start()
go s.handleSignals()
s.waitForExit()
}
func (s *Luffy) start() {
s.access.Lock()
defer s.access.Unlock()
if s.started {
return
}
s.startManagers()
s.started = true
}
func (s *Luffy) startManagers() {
for _, m := range s.managers {
m.start()
}
}
func (s *Luffy) Quit() {
s.access.Lock()
defer s.access.Unlock()
if !s.started {
return
}
s.quitManagers()
s.waitForExit()
s.started = false
}
func (s *Luffy) quitManagers() {
for _, m := range s.managers {
go func(m *manager) {
m.quit()
}(m)
}
}
func (s *Luffy) waitForExit() {
for _, m := range s.managers {
m.wait.Wait()
}
}