This repository was archived by the owner on Nov 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathredis.go
145 lines (122 loc) · 3 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"encoding/json"
"fmt"
"sync"
// "github.com/datatogether/task-mgmt/tasks"
"github.com/garyburd/redigo/redis"
// "net"
"time"
)
// Main redis connection
var rconn redis.Conn
// func connectRedis() (err error) {
// var netConn net.Conn
// if cfg.RedisUrl == "" {
// return fmt.Errorf("no redis url specified")
// }
// for i := 0; i <= 1000; i++ {
// netConn, err = net.Dial("tcp", cfg.RedisUrl)
// if err != nil {
// log.Infoln("error connecting to redis: %s", err.Error())
// time.Sleep(time.Second)
// continue
// }
// break
// }
// if netConn == nil {
// return fmt.Errorf("no net connection after 1000 tries")
// }
// log.Infoln("connected to redis")
// rconn = redis.NewConn(netConn, time.Second*20, time.Second*20)
// return SubscribeTaskProgress(rconn)
// }
func SubscribeTaskProgress() (err error) {
var conn redis.Conn
if cfg.RedisUrl == "" {
return fmt.Errorf("no redis url specified")
}
for i := 0; i <= 1000; i++ {
conn, err = redis.Dial("tcp", cfg.RedisUrl,
redis.DialReadTimeout(10*time.Second),
redis.DialWriteTimeout(0),
redis.DialConnectTimeout(20*time.Second),
)
if err != nil {
log.Infoln("error connecting to redis: %s", err.Error())
time.Sleep(2 * time.Second)
continue
}
break
}
if conn == nil {
return fmt.Errorf("couldn't connect to redis after 1000 tries")
}
defer conn.Close()
var wg sync.WaitGroup
wg.Add(2)
log.Infoln("connected to redis")
psc := redis.PubSubConn{Conn: conn}
if err = psc.PSubscribe("tasks.*"); err != nil {
return err
}
defer psc.PUnsubscribe()
go func() {
defer wg.Done()
for {
switch v := psc.Receive().(type) {
case redis.Message:
// log.Infof("%s: message: %s\n", v.Channel, v.Data)
// TODO - other types of messages will eventually come through
// here...
res := &ClientResponse{
Type: "TASK_PROGRESS",
RequestId: "server",
Schema: "TASK",
Data: json.RawMessage(v.Data),
}
data, err := json.Marshal(res)
if err != nil {
log.Infoln(err.Error())
} else {
room.broadcast <- data
}
case redis.PMessage:
// log.Infof("PMessage: %s %s %s\n", v.Pattern, v.Channel, v.Data)
// TODO - other types of messages will eventually come through
// here...
res := &ClientResponse{
Type: "TASK_PROGRESS",
RequestId: "server",
Schema: "TASK",
Data: json.RawMessage(v.Data),
}
data, err := json.Marshal(res)
if err != nil {
log.Infoln(err.Error())
} else {
room.broadcast <- data
}
case redis.Pong:
// log.Infof("received pong")
case redis.Subscription:
log.Infof("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Infoln("connection error:", conn.Err())
log.Infoln("message error: %s", v.Error())
return
}
}
}()
go func() {
for {
if err := psc.Ping("PING"); err != nil {
log.Infoln("error sending ping")
return
}
time.Sleep(time.Second * 8)
}
}()
wg.Wait()
return nil
}