Skip to content

Commit 8417e59

Browse files
committed
implement restartDirectReplicas as response to UnreachablePrimary
Signed-off-by: Shlomi Noach <[email protected]>
1 parent 884aa5a commit 8417e59

File tree

1 file changed

+126
-1
lines changed

1 file changed

+126
-1
lines changed

go/vt/vtorc/logic/topology_recovery.go

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import (
2525
"sync/atomic"
2626
"time"
2727

28+
"github.com/patrickmn/go-cache"
29+
"golang.org/x/sync/errgroup"
30+
2831
"vitess.io/vitess/go/stats"
2932
"vitess.io/vitess/go/vt/log"
3033
"vitess.io/vitess/go/vt/logutil"
@@ -40,6 +43,7 @@ import (
4043

4144
const (
4245
CheckAndRecoverGenericProblemRecoveryName string = "CheckAndRecoverGenericProblem"
46+
RestartDirectReplicasRecoveryName string = "RestartDirectReplicas"
4347
RecoverDeadPrimaryRecoveryName string = "RecoverDeadPrimary"
4448
RecoverPrimaryTabletDeletedRecoveryName string = "RecoverPrimaryTabletDeleted"
4549
RecoverPrimaryHasPrimaryRecoveryName string = "RecoverPrimaryHasPrimary"
@@ -53,6 +57,11 @@ const (
5357
var (
5458
countPendingRecoveries = stats.NewGauge("PendingRecoveries", "Count of the number of pending recoveries")
5559

60+
// urgentOperations helps rate limiting some operations on replicas, such as restarting replication
61+
// in an UnreachablePrimary scenario.
62+
urgentOperations *cache.Cache // key: tablet alias. value: arbitrary (we don't care)
63+
urgentOperationsInterval = 1 * time.Minute
64+
5665
// detectedProblems is used to track the number of detected problems.
5766
//
5867
// When an issue is active it will be set to 1, when it is no longer active
@@ -95,6 +104,7 @@ type recoveryFunction int
95104
const (
96105
noRecoveryFunc recoveryFunction = iota
97106
recoverGenericProblemFunc
107+
restartDirectReplicasFunc
98108
recoverDeadPrimaryFunc
99109
recoverPrimaryTabletDeletedFunc
100110
recoverPrimaryHasPrimaryFunc
@@ -156,6 +166,7 @@ func init() {
156166
stats.NewGaugeFunc("ShardLocksActive", "Number of actively-held shard locks", func() int64 {
157167
return atomic.LoadInt64(&shardsLockCounter)
158168
})
169+
urgentOperations = cache.New(urgentOperationsInterval, 2*urgentOperationsInterval)
159170
go initializeTopologyRecoveryPostConfiguration()
160171
}
161172

@@ -340,6 +351,114 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl
340351
return false, nil, nil
341352
}
342353

354+
// restartDirectReplicas restarts replication on direct replicas of an unreachable primary
355+
func restartDirectReplicas(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, logger *log.PrefixedLogger) (bool, *TopologyRecovery, error) {
356+
topologyRecovery, err := AttemptRecoveryRegistration(analysisEntry)
357+
if topologyRecovery == nil {
358+
message := fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another restartDirectReplicas.", analysisEntry.AnalyzedInstanceAlias)
359+
logger.Warning(message)
360+
_ = AuditTopologyRecovery(topologyRecovery, message)
361+
return false, nil, err
362+
}
363+
logger.Infof("Analysis: %v, will restart direct replicas of unreachable primary %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
364+
365+
// This has to be done in the end; whether successful or not, we should mark that the recovery is done.
366+
defer func() {
367+
_ = resolveRecovery(topologyRecovery, nil)
368+
}()
369+
370+
// Get durability policy for the keyspace to determine semi-sync settings
371+
durabilityPolicy, err := inst.GetDurabilityPolicy(analysisEntry.AnalyzedKeyspace)
372+
if err != nil {
373+
logger.Errorf("Error getting durability policy for keyspace %v: %v", analysisEntry.AnalyzedKeyspace, err)
374+
return false, topologyRecovery, err
375+
}
376+
377+
// Get all tablets in the shard
378+
tablets, err := ts.GetTabletsByShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)
379+
if err != nil {
380+
logger.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, err)
381+
return false, topologyRecovery, err
382+
}
383+
384+
// Find the primary tablet for semi-sync policy determination
385+
var primaryTablet *topodatapb.Tablet
386+
for _, tabletInfo := range tablets {
387+
tabletAlias := topoproto.TabletAliasString(tabletInfo.Tablet.Alias)
388+
if tabletAlias == analysisEntry.AnalyzedInstanceAlias {
389+
primaryTablet = tabletInfo.Tablet
390+
break
391+
}
392+
}
393+
394+
if primaryTablet == nil {
395+
logger.Errorf("Could not find primary tablet %s", analysisEntry.AnalyzedInstanceAlias)
396+
return false, topologyRecovery, fmt.Errorf("could not find primary tablet %s", analysisEntry.AnalyzedInstanceAlias)
397+
}
398+
399+
eg, _ := errgroup.WithContext(ctx)
400+
var restartExpected int
401+
var restartPerformed atomic.Int64
402+
// Iterate through all tablets and find direct replicas of the primary
403+
for _, tabletInfo := range tablets {
404+
tablet := tabletInfo.Tablet
405+
tabletAlias := topoproto.TabletAliasString(tablet.Alias)
406+
407+
// Skip the primary itself
408+
if tabletAlias == analysisEntry.AnalyzedInstanceAlias {
409+
continue
410+
}
411+
412+
if err := urgentOperations.Add(tabletAlias, true, cache.DefaultExpiration); err != nil {
413+
// Rate limit interval has not passed yet
414+
continue
415+
}
416+
417+
// Read the instance to check replication source
418+
instance, found, err := inst.ReadInstance(tabletAlias)
419+
if err != nil || !found {
420+
logger.Warningf("Could not read instance information for %s: %v", tabletAlias, err)
421+
continue
422+
}
423+
if instance.ReplicationDepth != 1 {
424+
// Not a direct replica of the primary
425+
continue
426+
}
427+
428+
restartExpected++
429+
eg.Go(func() error {
430+
logger.Infof("Restarting replication on direct replica %s", tabletAlias)
431+
_ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Restarting replication on direct replica %s", tabletAlias))
432+
433+
if err := tmc.StopReplication(ctx, tablet); err != nil {
434+
logger.Errorf("Failed to stop replication on %s: %v", tabletAlias, err)
435+
return err
436+
}
437+
438+
// Determine if this replica should use semi-sync based on the durability policy
439+
semiSync := policy.IsReplicaSemiSync(durabilityPolicy, primaryTablet, tablet)
440+
441+
if err := tmc.StartReplication(ctx, tablet, semiSync); err != nil {
442+
logger.Errorf("Failed to start replication on %s: %v", tabletAlias, err)
443+
return err
444+
}
445+
logger.Infof("Successfully restarted replication on %s", tabletAlias)
446+
restartPerformed.Add(1)
447+
return nil
448+
})
449+
}
450+
err = eg.Wait()
451+
message := fmt.Sprintf("Completed restart of %d/%d direct replicas for unreachable primary %+v. err=%+v", restartPerformed.Load(), restartExpected, analysisEntry.AnalyzedInstanceAlias, err)
452+
logger.Infof(message)
453+
_ = AuditTopologyRecovery(topologyRecovery, message)
454+
455+
if err != nil {
456+
return true, topologyRecovery, err
457+
}
458+
459+
return true, topologyRecovery, nil
460+
}
461+
343462
// isERSEnabled returns true if ERS can be used globally or for the given keyspace.
344463
func isERSEnabled(analysisEntry *inst.ReplicationAnalysis) bool {
345464
// If ERS is disabled globally we have no way of repairing the cluster.
@@ -405,7 +524,7 @@ func getCheckAndRecoverFunctionCode(analysisEntry *inst.ReplicationAnalysis) (re
405524
case inst.DeadPrimaryAndReplicas:
406525
recoveryFunc = recoverGenericProblemFunc
407526
case inst.UnreachablePrimary:
408-
recoveryFunc = recoverGenericProblemFunc
527+
recoveryFunc = restartDirectReplicasFunc
409528
case inst.UnreachablePrimaryWithLaggingReplicas:
410529
recoveryFunc = recoverGenericProblemFunc
411530
case inst.AllPrimaryReplicasNotReplicating:
@@ -430,6 +549,8 @@ func hasActionableRecovery(recoveryFunctionCode recoveryFunction) bool {
430549
return false
431550
case recoverGenericProblemFunc:
432551
return false
552+
case restartDirectReplicasFunc:
553+
return true
433554
case recoverDeadPrimaryFunc:
434555
return true
435556
case recoverPrimaryTabletDeletedFunc:
@@ -460,6 +581,8 @@ func getCheckAndRecoverFunction(recoveryFunctionCode recoveryFunction) (
460581
return nil
461582
case recoverGenericProblemFunc:
462583
return checkAndRecoverGenericProblem
584+
case restartDirectReplicasFunc:
585+
return restartDirectReplicas
463586
case recoverDeadPrimaryFunc:
464587
return recoverDeadPrimary
465588
case recoverPrimaryTabletDeletedFunc:
@@ -489,6 +612,8 @@ func getRecoverFunctionName(recoveryFunctionCode recoveryFunction) string {
489612
return ""
490613
case recoverGenericProblemFunc:
491614
return CheckAndRecoverGenericProblemRecoveryName
615+
case restartDirectReplicasFunc:
616+
return RestartDirectReplicasRecoveryName
492617
case recoverDeadPrimaryFunc:
493618
return RecoverDeadPrimaryRecoveryName
494619
case recoverPrimaryTabletDeletedFunc:

0 commit comments

Comments
 (0)