Skip to content

Commit 8217899

Browse files
author
Jeffrey Koehler
committed
divider fixes
1 parent 9b940fb commit 8217899

File tree

6 files changed

+53
-83
lines changed

6 files changed

+53
-83
lines changed

internal/redisstreams/streams.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (s *StreamListener) Listen() {
3737
if errors.Is(err, context.Canceled) {
3838
return
3939
}
40-
s.Logger(s.Ctx).Panic("failed to read data from stream", slog.String("err.error", err.Error()), slog.String("streamlistener.stream", s.Key))
40+
s.Logger(s.Ctx).Panic("failed to read data from stream", err, slog.String("streamlistener.stream", s.Key))
4141
}
4242

4343
//for all events, get teh data and send it to the channel.

internal/ticker/ticker.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ticker
33
import (
44
"context"
55
"fmt"
6-
"log/slog"
76
"time"
87

98
"github.com/streemtech/divider"
@@ -33,14 +32,15 @@ func (t TickerFunc) Do() error {
3332
ticker := time.NewTicker(t.D)
3433

3534
go func() {
36-
defer func() {
37-
ticker.Stop()
38-
if e := recover(); e != nil {
39-
if t.Logger != nil {
40-
t.Logger(t.C).Error("Ran into exception when running ticker function", slog.String("err.panic", fmt.Sprintf("%+v", e)))
41-
}
42-
}
43-
}()
35+
//Intentionally disabling the panic catch as the ticker will be disabled, which causes unexpected problems.
36+
// defer func() {
37+
// ticker.Stop()
38+
// if e := recover(); e != nil {
39+
// if t.Logger != nil {
40+
// t.Logger(t.C).Error("Ran into exception when running ticker function", slog.String("err.panic", fmt.Sprintf("%+v", e)))
41+
// }
42+
// }
43+
// }()
4444
for {
4545
select {
4646
case <-ticker.C:

redisconsistent/metrics.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ var WorkerRectifyTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
5252
Help: "Returns the length of time that rectifying the work assigned to this node took",
5353
Buckets: HistogramBuckets,
5454
}, []string{"divider"})
55-
var MasterPingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
56-
Namespace: "streemtech",
57-
Subsystem: "redis_work_divider",
58-
Name: "master_ping_time",
59-
Help: "Returns the length of time that updating the master took.",
60-
Buckets: HistogramBuckets,
61-
}, []string{"divider"})
6255
var WorkerPingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
6356
Namespace: "streemtech",
6457
Subsystem: "redis_work_divider",
@@ -161,11 +154,6 @@ func InitMetrics(name string) {
161154
panic(errors.Wrap(err, "failed to get metric"))
162155
}
163156

164-
_, err = MasterPingTime.GetMetricWithLabelValues(name)
165-
if err != nil {
166-
panic(errors.Wrap(err, "failed to get metric"))
167-
}
168-
169157
_, err = WorkerPingTime.GetMetricWithLabelValues(name)
170158
if err != nil {
171159
panic(errors.Wrap(err, "failed to get metric"))

redisconsistent/redis.go

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log/slog"
88
"time"
99

10+
// "github.com/k0kubun/pp/v3"
1011
"github.com/pkg/errors"
1112

1213
"github.com/streemtech/divider"
@@ -80,18 +81,6 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
8081
D: d.conf.compareKeys,
8182
F: d.workerRectifyAssignedWorkFunc,
8283
}
83-
d.masterPing = ticker.TickerFunc{
84-
C: d.ctx,
85-
Logger: logger,
86-
D: d.conf.masterPing,
87-
F: d.masterPingFunc,
88-
}
89-
d.workerPing = ticker.TickerFunc{
90-
C: d.ctx,
91-
Logger: logger,
92-
D: d.conf.workerPing,
93-
F: d.workerPingFunc,
94-
}
9584

9685
InitMetrics(d.conf.metricsName)
9786

@@ -103,7 +92,7 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
10392
//add workers to work holder
10493
err := d.storage.UpdateTimeoutForWorkers(ctx, d.getWorkerNodeKeys())
10594
if err != nil {
106-
d.conf.logger(ctx).Panic("failed to update timeout for workers", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
95+
d.conf.logger(ctx).Panic("failed to update timeout for workers", err, slog.String("divider.id", d.conf.instanceID))
10796
}
10897

10998
//start all listeners and tickers.
@@ -114,23 +103,13 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
114103

115104
err = d.masterUpdateRequiredWork.Do()
116105
if err != nil {
117-
d.conf.logger(ctx).Panic("failed to start ticker", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
106+
d.conf.logger(ctx).Panic("failed to start ticker", err, slog.String("divider.id", d.conf.instanceID))
118107
}
119108
err = d.workerRectifyAssignedWork.Do()
120109
if err != nil {
121-
d.conf.logger(ctx).Panic("failed to start ticker", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
122-
}
123-
err = d.masterPing.Do()
124-
if err != nil {
125-
d.conf.logger(ctx).Panic("failed to start ticker", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
126-
}
127-
err = d.workerPing.Do()
128-
if err != nil {
129-
d.conf.logger(ctx).Panic("failed to start ticker", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
110+
d.conf.logger(ctx).Panic("failed to start ticker", err, slog.String("divider.id", d.conf.instanceID))
130111
}
131112

132-
d.masterPing.F()
133-
d.workerPing.F()
134113
d.masterUpdateRequiredWork.F()
135114
d.workerRectifyAssignedWork.F()
136115

@@ -309,52 +288,53 @@ func (d *dividerWorker) masterUpdateRequiredWorkFunc() {
309288
}()
310289

311290
d.conf.logger(d.ctx).Debug("masterUpdateRequiredWorkFunc triggered", slog.String("divider.id", d.conf.instanceID))
312-
masterKey := fmt.Sprintf("%s:%s", d.conf.rootKey, "master")
313291

314-
//check the master key.
315-
master, err := d.client.Get(d.ctx, masterKey).Result()
316-
if err != nil {
317-
d.conf.logger(d.ctx).Panic("Error getting current master", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
318-
return
319-
}
320292
//if not master, dont do the work fetcher.
321-
if master != d.conf.instanceID {
293+
if !d.materUpdate(d.ctx) {
322294
return
323295
}
324296

325297
//Get all the newWork that needs to be assigned.
326298
newWork, err := d.conf.workFetcher(d.ctx)
327299
if err != nil {
328-
d.conf.logger(d.ctx).Panic("failed to execute work fetcher", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
300+
d.conf.logger(d.ctx).Panic("failed to execute work fetcher", err, slog.String("divider.id", d.conf.instanceID))
329301
return
330302
}
331303

332304
//Add the work to the list of work in the system.
333305
err = d.storage.AddWorkToDividedWork(d.ctx, newWork)
334306
if err != nil {
335-
d.conf.logger(d.ctx).Panic("failed to add work to divided work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
307+
d.conf.logger(d.ctx).Panic("failed to add work to divided work", err, slog.String("divider.id", d.conf.instanceID))
336308
return
337309
}
338310

339311
//get existing work
340312
existingWork, err := d.storage.GetAllWork(d.ctx)
341313
if err != nil {
342-
d.conf.logger(d.ctx).Panic("failed to get list of all work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
314+
d.conf.logger(d.ctx).Panic("failed to get list of all work", err, slog.String("divider.id", d.conf.instanceID))
343315
return
344316
}
345317

346318
//determine what work to remove
347319
_, remove := getToRemoveToKeep(existingWork, newWork)
348320

321+
// pp.Println(d.conf.metricsName, existingWork, newWork, remove)
349322
//remove all that work
350323
err = d.storage.RemoveWorkFromDividedWork(d.ctx, remove)
351324
if err != nil {
352-
d.conf.logger(d.ctx).Panic("failed to remove the old work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
325+
d.conf.logger(d.ctx).Panic("failed to remove the old work", err, slog.String("divider.id", d.conf.instanceID))
353326
return
354327
}
355328

329+
for _, v := range remove {
330+
err = d.removeWork.Publish(d.ctx, v)
331+
if err != nil {
332+
d.conf.logger(d.ctx).Panic("failed to publish old work removal", err, slog.String("divider.id", d.conf.instanceID))
333+
}
334+
}
356335
//note: Not triggering the add work event as that event should be triggered manually by the add work call, not by this.
357336
//The work will be picked up by the rectify call later.
337+
//Am manually calling the remove work as that should happen as soon as rectified if needed. (This can result in a double call, but thats ok in this case.)
358338
}
359339

360340
// get work assigned to this node, compare with known work, and start/stop all work as needed.
@@ -369,56 +349,56 @@ func (d *dividerWorker) workerRectifyAssignedWorkFunc() {
369349
if err != nil {
370350
d.conf.logger(d.ctx).Error("failed to rectify work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
371351
}
352+
353+
d.workerPing(d.ctx)
372354
}
373355

374356
// set master as still attached
375-
func (d *dividerWorker) masterPingFunc() {
376-
start := time.Now()
377-
defer func() {
378-
ObserveDuration(MasterPingTime, d.conf.metricsName, time.Since(start))
379-
}()
357+
func (d *dividerWorker) materUpdate(ctx context.Context) (isMaster bool) {
380358

381359
// d.conf.logger(d.ctx).Debug("masterPingFunc triggered")
382360
masterKey := fmt.Sprintf("%s:%s", d.conf.rootKey, "master")
383361
//set the master key to this value if it does not exist.
384-
set, err := d.client.SetNX(d.ctx, masterKey, d.conf.instanceID, d.conf.masterTimeout).Result()
362+
set, err := d.client.SetNX(ctx, masterKey, d.conf.instanceID, d.conf.masterTimeout).Result()
385363
if err != nil {
386-
d.conf.logger(d.ctx).Panic("Error updating node master", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
364+
d.conf.logger(ctx).Panic("Error updating node master", err, slog.String("divider.id", d.conf.instanceID))
387365
return
388366
}
389367

390368
if set {
391-
d.conf.logger(d.ctx).Info("Master set to this node", slog.String("divider.id", d.conf.instanceID))
369+
d.conf.logger(ctx).Info("Master set to this node", slog.String("divider.id", d.conf.instanceID))
392370
}
393371

394372
//check the master key.
395-
master, err := d.client.Get(d.ctx, masterKey).Result()
373+
master, err := d.client.Get(ctx, masterKey).Result()
396374
if err != nil {
397-
d.conf.logger(d.ctx).Panic("Error getting current master", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
375+
d.conf.logger(ctx).Panic("Error getting current master", err, slog.String("divider.id", d.conf.instanceID))
398376
return
399377
}
400378

401379
//if this is the master, run the update to keep the master inline.
402380
if master == d.conf.instanceID {
403-
_, err = d.client.Set(d.ctx, masterKey, d.conf.instanceID, d.conf.masterTimeout).Result()
381+
_, err = d.client.Set(ctx, masterKey, d.conf.instanceID, d.conf.masterTimeout).Result()
404382
if err != nil {
405-
d.conf.logger(d.ctx).Panic("Error updating master timeout", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
383+
d.conf.logger(ctx).Panic("Error updating master timeout", err, slog.String("divider.id", d.conf.instanceID))
406384
return
407385
}
386+
return true
408387
}
388+
return false
409389
}
410390

411391
// update nodes in storage as still attached.
412-
func (d *dividerWorker) workerPingFunc() {
392+
func (d *dividerWorker) workerPing(ctx context.Context) {
413393
start := time.Now()
414394
defer func() {
415395
ObserveDuration(WorkerPingTime, d.conf.metricsName, time.Since(start))
416396
}()
417397
// d.conf.logger(d.ctx).Debug("workerPingFunc triggered")
418398
//add workers to work holder
419-
err := d.storage.UpdateTimeoutForWorkers(d.ctx, d.getWorkerNodeKeys())
399+
err := d.storage.UpdateTimeoutForWorkers(ctx, d.getWorkerNodeKeys())
420400
if err != nil {
421-
d.conf.logger(d.ctx).Panic("failed to update timeout for workers", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
401+
d.conf.logger(d.ctx).Panic("failed to update timeout for workers", err, slog.String("divider.id", d.conf.instanceID))
422402
}
423403
}
424404

@@ -474,19 +454,23 @@ func (d *dividerWorker) rectifyWork(ctx context.Context) (err error) {
474454

475455
// pp.Println(d.conf.instanceID, "toRemove", toRemove.Array())
476456
for key := range toRemove.Iterator() {
477-
err = d.conf.stopper(ctx, key)
457+
ctx2, can := context.WithTimeout(ctx, time.Second*5)
458+
defer can()
459+
err = d.conf.stopper(ctx2, key)
478460
if err != nil {
479-
d.conf.logger(ctx).Error("failed to execute stopper, not removing from known work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
461+
d.conf.logger(ctx2).Error("failed to execute stopper, not removing from known work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
480462
continue
481463
}
482464
d.knownWork.Remove(key)
483465
}
484466

485467
// pp.Println(d.conf.instanceID, "toAdd", toAdd.Array())
486468
for key := range toAdd.Iterator() {
487-
err = d.conf.starter(ctx, key)
469+
ctx2, can := context.WithTimeout(ctx, time.Second*5)
470+
defer can()
471+
err = d.conf.starter(ctx2, key)
488472
if err != nil {
489-
d.conf.logger(ctx).Error("failed to execute starter, not adding to known work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
473+
d.conf.logger(ctx2).Error("failed to execute starter, not adding to known work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
490474
continue
491475
}
492476
d.knownWork.Add(key)

redisconsistent/types.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,4 @@ type dividerWorker struct {
3030

3131
masterUpdateRequiredWork ticker.TickerFunc
3232
workerRectifyAssignedWork ticker.TickerFunc
33-
masterPing ticker.TickerFunc
34-
workerPing ticker.TickerFunc
3533
}

util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Logger interface {
3636
Debug(msg string, args ...slog.Attr)
3737
Info(msg string, args ...slog.Attr)
3838
Error(msg string, args ...slog.Attr)
39-
Panic(msg string, args ...slog.Attr)
39+
Panic(msg string, err error, args ...slog.Attr)
4040
}
4141

4242
func DefaultLogger(ctx context.Context) Logger {
@@ -71,8 +71,8 @@ func (d *defaultLogger) Error(msg string, args ...slog.Attr) {
7171

7272
l.ErrorContext(d.ctx, msg)
7373
}
74-
func (d *defaultLogger) Panic(msg string, args ...slog.Attr) {
75-
l := slog.Default()
74+
func (d *defaultLogger) Panic(msg string, err error, args ...slog.Attr) {
75+
l := slog.Default().With(slog.String("err.error", err.Error()))
7676
for _, v := range args {
7777
l = l.With(v)
7878
}

0 commit comments

Comments
 (0)