Skip to content

Commit b119432

Browse files
author
Jeffrey Koehler
committed
Update divider to remove itself from master if required, and trigger the update-master func on worker being removed.
1 parent e039805 commit b119432

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

redisconsistent/redis.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,13 @@ func (d *dividerWorker) StopWorker(ctx context.Context) {
114114
d.ctx = nil
115115
d.cancel = nil
116116

117+
err := d.removeMaster(ctx)
118+
if err != nil {
119+
d.conf.logger(ctx).Error("failed to remove workers as master", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
120+
}
121+
117122
//remove workers from work holder.
118-
err := d.storage.RemoveWorkers(ctx, d.getWorkerNodeKeys())
123+
err = d.storage.RemoveWorkers(ctx, d.getWorkerNodeKeys())
119124
if err != nil {
120125
d.conf.logger(ctx).Error("failed to remove workers from work list", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
121126
}
@@ -185,6 +190,9 @@ func (d *dividerWorker) GetWork(ctx context.Context) (iter.Seq[string], error) {
185190

186191
// force refresh of work, and if necessary, drop any work no-longer assigned to me.
187192
func (d *dividerWorker) newWorkerEvent(ctx context.Context, key string) {
193+
if d.cancel == nil {
194+
return
195+
}
188196
start := time.Now()
189197

190198
d.conf.logger(ctx).Debug("newWorkerEvent triggered: "+key, slog.String("divider.id", d.conf.instanceID))
@@ -198,9 +206,17 @@ func (d *dividerWorker) newWorkerEvent(ctx context.Context, key string) {
198206

199207
// grab new work and check if a new master needs to be created.
200208
func (d *dividerWorker) removeWorkerEvent(ctx context.Context, key string) {
209+
if d.cancel == nil {
210+
return
211+
}
201212
start := time.Now()
202213

203214
d.conf.logger(ctx).Debug("removeWorkerEvent triggered: "+key, slog.String("divider.id", d.conf.instanceID))
215+
216+
//trigger a force update of the master
217+
d.masterUpdateRequiredWorkFunc()
218+
219+
//rectify all of your work.
204220
err := d.rectifyWork(ctx)
205221
if err != nil {
206222
d.conf.logger(ctx).Error("failed to rectify work", slog.String("err.error", err.Error()), slog.String("divider.id", d.conf.instanceID))
@@ -211,6 +227,9 @@ func (d *dividerWorker) removeWorkerEvent(ctx context.Context, key string) {
211227

212228
// check if work belongs to me and if needed, start it.
213229
func (d *dividerWorker) newWorkEvent(ctx context.Context, key string) {
230+
if d.cancel == nil {
231+
return
232+
}
214233
start := time.Now()
215234

216235
d.conf.logger(ctx).Debug("newWorkEvent triggered: "+key, slog.String("divider.id", d.conf.instanceID))
@@ -240,6 +259,9 @@ func (d *dividerWorker) newWorkEvent(ctx context.Context, key string) {
240259

241260
// check if I am running the work, and if needed, remove it from my list of things to work on.
242261
func (d *dividerWorker) removeWorkEvent(ctx context.Context, key string) {
262+
if d.cancel == nil {
263+
return
264+
}
243265
start := time.Now()
244266

245267
d.conf.logger(ctx).Debug("removeWorkEvent triggered: "+key, slog.String("divider.id", d.conf.instanceID))
@@ -331,6 +353,28 @@ func (d *dividerWorker) workerRectifyAssignedWorkFunc() {
331353
d.workerPing(d.ctx)
332354
}
333355

356+
func (d *dividerWorker) removeMaster(ctx context.Context) (err error) {
357+
358+
masterKey := fmt.Sprintf("%s:%s", d.conf.rootKey, "master")
359+
360+
master, err := d.client.Get(ctx, masterKey).Result()
361+
if err != nil {
362+
d.conf.logger(ctx).Panic("Error getting current master", err, slog.String("divider.id", d.conf.instanceID))
363+
return errors.Wrap(err, "failed to get current master")
364+
}
365+
366+
//if this is the master, run the update to keep the master inline.
367+
if master == d.conf.instanceID {
368+
_, err = d.client.Del(ctx, masterKey).Result()
369+
370+
if err != nil {
371+
d.conf.logger(ctx).Panic("deleting master key", err, slog.String("divider.id", d.conf.instanceID))
372+
return errors.Wrap(err, "failed to delete master key")
373+
}
374+
}
375+
return nil
376+
}
377+
334378
// set master as still attached
335379
func (d *dividerWorker) materUpdate(ctx context.Context) (isMaster bool) {
336380

0 commit comments

Comments
 (0)