Skip to content

Commit e039805

Browse files
author
Jeffrey Koehler
committed
make divider listeners be initialized on creation, not just start worker
1 parent 8217899 commit e039805

File tree

2 files changed

+41
-52
lines changed

2 files changed

+41
-52
lines changed

redisconsistent/config.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package redisconsistent
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/google/uuid"
78
"github.com/redis/go-redis/v9"
89

910
"github.com/streemtech/divider"
11+
"github.com/streemtech/divider/internal/redisstreams"
1012
"github.com/streemtech/divider/internal/set"
1113
)
1214

@@ -20,13 +22,9 @@ type dividerConf struct {
2022
starter StarterFunc
2123
stopper StarterFunc
2224

23-
//how often the master updates itself
24-
masterPing time.Duration
2525
//how long the master reserves itself.
2626
masterTimeout time.Duration
2727

28-
//how long the workers wait between pings in the storage
29-
workerPing time.Duration
3028
//how long the workers wait before counting a timeout.
3129
workerTimeout time.Duration
3230

@@ -95,27 +93,13 @@ func WithMasterTimeoutDuration(value time.Duration) DividerOpt {
9593
}
9694
}
9795

98-
// Time that masters will wait between updating their "im alive" statuses.
99-
func WithMasterPingTime(value time.Duration) DividerOpt {
100-
return func(dc *dividerConf) {
101-
dc.masterPing = value
102-
}
103-
}
104-
10596
// Time that a worker can be in the list after a ping and have not reported.
10697
func WithWorkerTimeoutDuration(value time.Duration) DividerOpt {
10798
return func(dc *dividerConf) {
10899
dc.workerTimeout = value
109100
}
110101
}
111102

112-
// Time that workers will wait between updating their "im alive" statuses.
113-
func WithWorkerPingTime(value time.Duration) DividerOpt {
114-
return func(dc *dividerConf) {
115-
dc.workerPing = value
116-
}
117-
}
118-
119103
// time between checking for the full list of work.
120104
func WithUpdateAssignmentsDuration(value time.Duration) DividerOpt {
121105
return func(dc *dividerConf) {
@@ -138,9 +122,7 @@ func New(client redis.UniversalClient, rootKey string, Opts ...DividerOpt) (divi
138122
nodeCount: 10,
139123
logger: divider.DefaultLogger,
140124
masterTimeout: time.Second * 10,
141-
masterPing: time.Second,
142125
workerTimeout: time.Second * 10,
143-
workerPing: time.Second,
144126

145127
updateAssignments: time.Second * 10,
146128
compareKeys: time.Second * 10,
@@ -161,5 +143,34 @@ func New(client redis.UniversalClient, rootKey string, Opts ...DividerOpt) (divi
161143
workerTimeout: conf.workerTimeout,
162144
rootKey: conf.rootKey,
163145
},
146+
//start tickers and listeners
147+
newWorker: redisstreams.StreamListener{
148+
// Ctx: ctx,
149+
Client: client,
150+
Key: fmt.Sprintf("%s:%s", conf.rootKey, "new_worker"),
151+
// Callback: d.newWorkerEvent,
152+
Logger: conf.logger,
153+
},
154+
removeWorker: redisstreams.StreamListener{
155+
// Ctx: ctx,
156+
Client: client,
157+
Key: fmt.Sprintf("%s:%s", conf.rootKey, "remove_worker"),
158+
// Callback: d.removeWorkerEvent,
159+
Logger: conf.logger,
160+
},
161+
newWork: redisstreams.StreamListener{
162+
// Ctx: ctx,
163+
Client: client,
164+
Key: fmt.Sprintf("%s:%s", conf.rootKey, "new_work"),
165+
// Callback: d.newWorkEvent,
166+
Logger: conf.logger,
167+
},
168+
removeWork: redisstreams.StreamListener{
169+
// Ctx: ctx,
170+
Client: client,
171+
Key: fmt.Sprintf("%s:%s", conf.rootKey, "remove_work"),
172+
// Callback: d.removeWorkEvent,
173+
Logger: conf.logger,
174+
},
164175
}, nil
165176
}

redisconsistent/redis.go

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/pkg/errors"
1212

1313
"github.com/streemtech/divider"
14-
"github.com/streemtech/divider/internal/redisstreams"
1514
"github.com/streemtech/divider/internal/set"
1615
"github.com/streemtech/divider/internal/ticker"
1716
)
@@ -38,46 +37,25 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
3837

3938
d.ctx, d.cancel = context.WithCancel(ctx)
4039

41-
var logger divider.LoggerGen
4240
//start tickers and listeners
43-
d.newWorker = redisstreams.StreamListener{
44-
Ctx: d.ctx,
45-
Client: d.client,
46-
Key: fmt.Sprintf("%s:%s", d.conf.rootKey, "new_worker"),
47-
Callback: d.newWorkerEvent,
48-
Logger: logger,
49-
}
50-
d.removeWorker = redisstreams.StreamListener{
51-
Ctx: d.ctx,
52-
Client: d.client,
53-
Key: fmt.Sprintf("%s:%s", d.conf.rootKey, "remove_worker"),
54-
Callback: d.removeWorkerEvent,
55-
Logger: logger,
56-
}
57-
d.newWork = redisstreams.StreamListener{
58-
Ctx: d.ctx,
59-
Client: d.client,
60-
Key: fmt.Sprintf("%s:%s", d.conf.rootKey, "new_work"),
61-
Callback: d.newWorkEvent,
62-
Logger: logger,
63-
}
64-
d.removeWork = redisstreams.StreamListener{
65-
Ctx: d.ctx,
66-
Client: d.client,
67-
Key: fmt.Sprintf("%s:%s", d.conf.rootKey, "remove_work"),
68-
Callback: d.removeWorkEvent,
69-
Logger: logger,
70-
}
41+
d.newWorker.Ctx = d.ctx
42+
d.newWorker.Callback = d.newWorkerEvent
43+
d.removeWorker.Ctx = d.ctx
44+
d.removeWorker.Callback = d.removeWorkerEvent
45+
d.newWork.Ctx = d.ctx
46+
d.newWork.Callback = d.newWorkEvent
47+
d.removeWork.Ctx = d.ctx
48+
d.removeWork.Callback = d.removeWorkEvent
7149

7250
d.masterUpdateRequiredWork = ticker.TickerFunc{
7351
C: d.ctx,
74-
Logger: logger,
52+
Logger: d.conf.logger,
7553
D: d.conf.updateAssignments,
7654
F: d.masterUpdateRequiredWorkFunc,
7755
}
7856
d.workerRectifyAssignedWork = ticker.TickerFunc{
7957
C: d.ctx,
80-
Logger: logger,
58+
Logger: d.conf.logger,
8159
D: d.conf.compareKeys,
8260
F: d.workerRectifyAssignedWorkFunc,
8361
}

0 commit comments

Comments
 (0)