-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis_listener.go
106 lines (96 loc) · 2.45 KB
/
redis_listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package container_monitor
import (
"gopkg.in/redis.v4"
"log"
)
type RedisListener struct {
Client *redis.Client // Redis client
monitor *ContainerMonitor // Container monitor.
}
// Returns new instance of Redis listener.
// props: r_url string Redis server URL.
// password string Redis server password.
// db int Redis server Data Base ID.
func NewRedisListener(r_url string, password string, db int) *RedisListener {
return &RedisListener{
Client: redis.NewClient(&redis.Options{
Addr: r_url,
Password: password,
DB: db,
}),
}
}
// Listens Redis pub/sub channel.
func (l *RedisListener) Listen() {
defer l.Client.Close()
pubsub, err := l.Client.Subscribe(STRESS_TEST_CHANNEL)
if err != nil {
log.Printf("CLIENT SUBSCRIBE ERROR: %s", err.Error())
return
}
defer pubsub.Unsubscribe(STRESS_TEST_CHANNEL)
for {
err := l.ping()
if err != nil {
log.Printf("CLIENT PING ERROR: %s", err.Error())
return
}
mess, err := pubsub.ReceiveMessage()
if err == nil {
l.readRedisMessage(mess)
}
}
}
// Calls redis pub/sub channel.
func (l *RedisListener) Call(test_id string, command string) {
mess := newRedisMessage(command, test_id)
message_string, err := marshalRedisMessage(mess)
if err != nil {
log.Printf("can not create start redis message %s", err.Error())
return
}
l.Client.Publish(STRESS_TEST_CHANNEL, message_string)
}
// Closes listener
func (l *RedisListener) Close() {
l.Client.Close()
}
// Reads Redis pub/sub messages.
func (l *RedisListener) readRedisMessage(mess *redis.Message) {
message, err := unmarshalRedisMessage(mess.Payload)
if err != nil {
log.Printf("Can not unmarshall redis message: %s", err.Error())
return
}
if message.Command == START_COMMAND {
l.startTest(message.TestID)
} else {
l.stopTest(message.TestID)
}
}
// Starts gathering information about the container system.
func (l *RedisListener) startTest(test_id string) {
if l.monitor != nil {
log.Println("ERROR: last test not finiched!")
return
}
l.monitor = newContainerMonitor(l.Client, test_id)
go l.monitor.Run()
}
// Stops gathering information about the container system.
func (l *RedisListener) stopTest(test_id string) {
if l.monitor == nil {
log.Println("ERROR: test not started!")
return
}
l.monitor.Stop()
l.monitor = nil
}
// Pings redis pub/sub channel.
func (l *RedisListener) ping() error {
err := l.Client.Ping().Err()
if err != nil {
return err
}
return nil
}