diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index fab5b80bdbf..25f82ca8bed 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -85,7 +85,7 @@ func TestAPIEndpoints(t *testing.T) { }) // Before we disable recoveries, let us wait until VTOrc has fixed all the issues (if any). - _, _ = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + _, _ = utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(i int, response string) bool { return response != "null" }) @@ -164,15 +164,15 @@ func TestAPIEndpoints(t *testing.T) { assert.Equal(t, "Global recoveries disabled\n", resp) }) - t.Run("Replication Analysis API", func(t *testing.T) { + t.Run("Detection Analysis API", func(t *testing.T) { // use vtctldclient to stop replication _, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("StopReplication", replica.Alias) require.NoError(t, err) // We know VTOrc won't fix this since we disabled global recoveries! // Wait until VTOrc picks up on this issue and verify - // that we see a not null result on the api/replication-analysis page - status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + // that we see a not null result on the api/detection-analysis page + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(_ int, response string) bool { return response == "null" }) assert.Equal(t, 200, status, resp) @@ -180,25 +180,25 @@ func TestAPIEndpoints(t *testing.T) { assert.Contains(t, resp, `"Analysis": "ReplicationStopped"`) // Verify that filtering also works in the API as intended - status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/detection-analysis?keyspace=ks&shard=0") require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"AnalyzedInstanceAlias": "%s"`, replica.Alias)) // Verify that filtering by keyspace also works in the API as intended - status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/detection-analysis?keyspace=ks") require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Contains(t, resp, fmt.Sprintf(`"AnalyzedInstanceAlias": "%s"`, replica.Alias)) // Check that filtering using keyspace and shard works - status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/detection-analysis?keyspace=ks&shard=80-") require.NoError(t, err) assert.Equal(t, 200, status, resp) assert.Equal(t, "null", resp) // Check that filtering using just the shard fails - status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0") + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/detection-analysis?shard=0") require.NoError(t, err) assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 6158fe5161f..ae56c2b46a6 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -681,7 +681,7 @@ func TestFullStatusConnectionPooling(t *testing.T) { _ = curPrimary.VttabletProcess.Kill() // Wait until VTOrc notices some problems - status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(_ int, response string) bool { return response == "null" }) assert.Equal(t, 200, status) @@ -697,7 +697,7 @@ func TestFullStatusConnectionPooling(t *testing.T) { // See that VTOrc eventually reports no errors. // Wait until there are no problems and the api endpoint returns null - status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(_ int, response string) bool { return response != "null" }) assert.Equal(t, 200, status) @@ -708,7 +708,7 @@ func TestFullStatusConnectionPooling(t *testing.T) { _ = curPrimary.VttabletProcess.Kill() // Wait until VTOrc notices some problems - status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(_ int, response string) bool { return response == "null" }) assert.Equal(t, 200, status) @@ -724,7 +724,7 @@ func TestFullStatusConnectionPooling(t *testing.T) { // See that VTOrc eventually reports no errors. // Wait until there are no problems and the api endpoint returns null - status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/detection-analysis", func(_ int, response string) bool { return response != "null" }) assert.Equal(t, 200, status) diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go index 6a783da3447..5d27fcb68e9 100644 --- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go +++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go @@ -26,7 +26,7 @@ import ( "vitess.io/vitess/go/test/endtoend/vtorc/utils" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/servenv" - + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/logic" @@ -77,7 +77,13 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { } } - primaryInstance, err := inst.ReadTopologyInstanceBufferable(primary.Alias, nil) + primaryAlias, err := topoproto.ParseTabletAlias(primary.Alias) + require.NoError(t, err) + + replicaAlias, err := topoproto.ParseTabletAlias(replica.Alias) + require.NoError(t, err) + + primaryInstance, err := inst.ReadTopologyInstanceBufferable(primaryAlias, nil) require.NoError(t, err) require.NotNil(t, primaryInstance) assert.Equal(t, utils.Hostname, primaryInstance.Hostname) @@ -129,7 +135,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { err = logic.EnableRecovery() require.NoError(t, err) - replicaInstance, err := inst.ReadTopologyInstanceBufferable(replica.Alias, nil) + replicaInstance, err := inst.ReadTopologyInstanceBufferable(replicaAlias, nil) require.NoError(t, err) require.NotNil(t, replicaInstance) assert.Equal(t, utils.Hostname, replicaInstance.Hostname) diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index c2f5e8ae85a..292daa67f7a 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -87,8 +87,8 @@ type DetectionAnalysisHints struct { // DetectionAnalysis represents an analysis of a detected problem. type DetectionAnalysis struct { - AnalyzedInstanceAlias string - AnalyzedInstancePrimaryAlias string + AnalyzedInstanceAlias *topodatapb.TabletAlias + AnalyzedInstancePrimaryAlias *topodatapb.TabletAlias TabletType topodatapb.TabletType CurrentTabletType topodatapb.TabletType PrimaryTimeStamp time.Time diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 19560a69b07..ce7f996b636 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -53,7 +53,7 @@ func initializeAnalysisDaoPostConfiguration() { type clusterAnalysis struct { hasShardWideAction bool totalTablets int - primaryAlias string + primaryAlias *topodatapb.TabletAlias durability policy.Durabler } @@ -319,8 +319,8 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi a.ShardPrimaryTermTimestamp = m.GetTime("shard_primary_term_timestamp") a.IsPrimary = m.GetBool("is_primary") - a.AnalyzedInstanceAlias = topoproto.TabletAliasString(tablet.Alias) - a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias) + a.AnalyzedInstanceAlias = tablet.Alias + a.AnalyzedInstancePrimaryAlias = primaryTablet.Alias a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{ LogFile: m.GetString("binary_log_file"), LogPos: m.GetUint64("binary_log_pos"), @@ -461,12 +461,12 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi case topo.IsReplicaType(a.TabletType) && a.ErrantGTID != "": a.Analysis = ErrantGTIDDetected a.Description = "Tablet has errant GTIDs" - case topo.IsReplicaType(a.TabletType) && ca.primaryAlias == "" && a.ShardPrimaryTermTimestamp.IsZero(): + case topo.IsReplicaType(a.TabletType) && ca.primaryAlias == nil && a.ShardPrimaryTermTimestamp.IsZero(): // ClusterHasNoPrimary should only be detected when the shard record doesn't have any primary term start time specified either. a.Analysis = ClusterHasNoPrimary a.Description = "Cluster has no primary" ca.hasShardWideAction = true - case topo.IsReplicaType(a.TabletType) && ca.primaryAlias == "" && !a.ShardPrimaryTermTimestamp.IsZero(): + case topo.IsReplicaType(a.TabletType) && ca.primaryAlias == nil && !a.ShardPrimaryTermTimestamp.IsZero(): // If there are no primary tablets, but the shard primary start time isn't empty, then we know // the primary tablet was deleted. a.Analysis = PrimaryTabletDeleted @@ -491,7 +491,7 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi a.Analysis = ReplicaMisconfigured a.Description = "Replica has been misconfigured" // - case topo.IsReplicaType(a.TabletType) && !a.IsPrimary && ca.primaryAlias != "" && a.AnalyzedInstancePrimaryAlias != ca.primaryAlias: + case topo.IsReplicaType(a.TabletType) && !a.IsPrimary && ca.primaryAlias != nil && !topoproto.TabletAliasEqual(a.AnalyzedInstancePrimaryAlias, ca.primaryAlias): a.Analysis = ConnectedToWrongPrimary a.Description = "Connected to wrong primary" // @@ -659,12 +659,13 @@ func postProcessAnalyses(result []*DetectionAnalysis, clusters map[string]*clust // auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table. // To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to // analysis codes are written. -func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisCode) error { - if lastWrittenAnalysis, found := recentInstantAnalysis.Get(tabletAlias); found { +func auditInstanceAnalysisInChangelog(tabletAlias *topodatapb.TabletAlias, analysisCode AnalysisCode) error { + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + if lastWrittenAnalysis, found := recentInstantAnalysis.Get(tabletAliasString); found { if lastWrittenAnalysis == analysisCode { // Surely nothing new. // And let's expand the timeout - recentInstantAnalysis.Set(tabletAlias, analysisCode, cache.DefaultExpiration) + recentInstantAnalysis.Set(tabletAliasString, analysisCode, cache.DefaultExpiration) return nil } } @@ -680,7 +681,9 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC alias = ? AND analysis != ? `, - string(analysisCode), tabletAlias, string(analysisCode), + string(analysisCode), + tabletAliasString, + string(analysisCode), ) if err != nil { log.Error(err) @@ -709,7 +712,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC DATETIME('now'), ? )`, - tabletAlias, string(analysisCode), + tabletAliasString, + string(analysisCode), ) if err != nil { log.Error(err) @@ -722,7 +726,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC } firstInsertion = rows > 0 } - recentInstantAnalysis.Set(tabletAlias, analysisCode, cache.DefaultExpiration) + recentInstantAnalysis.Set(tabletAliasString, analysisCode, cache.DefaultExpiration) // If the analysis has changed or if it is the first insertion, we need to make sure we write this change to the database. if !lastAnalysisChanged && !firstInsertion { return nil @@ -738,7 +742,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC DATETIME('now'), ? )`, - tabletAlias, string(analysisCode), + tabletAliasString, + string(analysisCode), ) if err == nil { analysisChangeWriteCounter.Add(1) diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index 0ccc08fd4c8..eefb38df3ac 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -1106,24 +1106,24 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) { }() updates := []struct { - tabletAlias string + tabletAlias *topodatapb.TabletAlias analysisCode AnalysisCode writeCounterExpectation int64 wantErr string }{ { // Store a new analysis for the zone1-100 tablet. - tabletAlias: "zone1-100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, analysisCode: ReplicationStopped, writeCounterExpectation: 1, }, { // Write the same analysis, no new write should happen. - tabletAlias: "zone1-100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, analysisCode: ReplicationStopped, writeCounterExpectation: 1, }, { // Change the analysis. This should trigger an update. - tabletAlias: "zone1-100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, analysisCode: ReplicaSemiSyncMustBeSet, writeCounterExpectation: 2, }, @@ -1192,21 +1192,21 @@ func TestPostProcessAnalyses(t *testing.T) { analyses: []*DetectionAnalysis{ { Analysis: InvalidPrimary, - AnalyzedInstanceAlias: "zone1-100", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_PRIMARY, }, { Analysis: NoProblem, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-202", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 202}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, TabletType: topodatapb.TabletType_RDONLY, }, { Analysis: ConnectedToWrongPrimary, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-101", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_REPLICA, @@ -1214,21 +1214,21 @@ func TestPostProcessAnalyses(t *testing.T) { }, { Analysis: ReplicationStopped, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-102", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 102}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_RDONLY, ReplicationStopped: true, }, { Analysis: InvalidReplica, - AnalyzedInstanceAlias: "zone1-108", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 108}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_REPLICA, LastCheckValid: false, }, { Analysis: NoProblem, - AnalyzedInstanceAlias: "zone1-302", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 302}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, LastCheckValid: true, @@ -1238,21 +1238,21 @@ func TestPostProcessAnalyses(t *testing.T) { want: []*DetectionAnalysis{ { Analysis: DeadPrimary, - AnalyzedInstanceAlias: "zone1-100", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_PRIMARY, }, { Analysis: NoProblem, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-202", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 202}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, TabletType: topodatapb.TabletType_RDONLY, }, { Analysis: NoProblem, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-302", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 302}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, TabletType: topodatapb.TabletType_REPLICA, @@ -1264,13 +1264,13 @@ func TestPostProcessAnalyses(t *testing.T) { analyses: []*DetectionAnalysis{ { Analysis: InvalidPrimary, - AnalyzedInstanceAlias: "zone1-100", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_PRIMARY, }, { Analysis: NoProblem, - AnalyzedInstanceAlias: "zone1-202", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 202}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, LastCheckValid: true, @@ -1278,14 +1278,14 @@ func TestPostProcessAnalyses(t *testing.T) { }, { Analysis: NoProblem, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-101", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_REPLICA, }, { Analysis: ReplicationStopped, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-102", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 102}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard0, TabletType: topodatapb.TabletType_RDONLY, @@ -1293,7 +1293,7 @@ func TestPostProcessAnalyses(t *testing.T) { }, { Analysis: NoProblem, LastCheckValid: true, - AnalyzedInstanceAlias: "zone1-302", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 302}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard80, TabletType: topodatapb.TabletType_REPLICA, diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index 7ae60fba927..0d75c87769d 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -23,6 +23,8 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" ) @@ -30,12 +32,12 @@ import ( var auditOperationCounter = stats.NewCounter("AuditWrite", "Number of audit operations performed") // AuditOperation creates and writes a new audit entry by given params -func AuditOperation(auditType string, tabletAlias string, message string) error { - keyspace := "" - shard := "" - if tabletAlias != "" { +func AuditOperation(auditType string, tabletAlias *topodatapb.TabletAlias, message string) error { + var keyspace, shard string + if tabletAlias != nil { keyspace, shard, _ = GetKeyspaceShardName(tabletAlias) } + tabletAliasString := topoproto.TabletAliasString(tabletAlias) auditWrittenToFile := false if config.GetAuditFileLocation() != "" { @@ -48,7 +50,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error } defer f.Close() - text := fmt.Sprintf("%s\t%s\t%s\t[%s:%s]\t%s\t\n", time.Now().Format("2006-01-02 15:04:05"), auditType, tabletAlias, keyspace, shard, message) + text := fmt.Sprintf("%s\t%s\t%s\t[%s:%s]\t%s\t\n", time.Now().Format("2006-01-02 15:04:05"), auditType, tabletAliasString, keyspace, shard, message) if _, err = f.WriteString(text); err != nil { log.Error(err) } @@ -72,7 +74,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error ? )`, auditType, - tabletAlias, + tabletAliasString, keyspace, shard, message, @@ -82,7 +84,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error return err } } - logMessage := fmt.Sprintf("auditType:%s alias:%s keyspace:%s shard:%s message:%s", auditType, tabletAlias, keyspace, shard, message) + logMessage := fmt.Sprintf("auditType:%s alias:%s keyspace:%s shard:%s message:%s", auditType, tabletAliasString, keyspace, shard, message) if syslogMessage(logMessage) { auditWrittenToFile = true } diff --git a/go/vt/vtorc/inst/audit_dao_test.go b/go/vt/vtorc/inst/audit_dao_test.go index d22e9177dc3..011a14d6d89 100644 --- a/go/vt/vtorc/inst/audit_dao_test.go +++ b/go/vt/vtorc/inst/audit_dao_test.go @@ -73,7 +73,6 @@ func TestAuditOperation(t *testing.T) { err = SaveTablet(tab100) require.NoError(t, err) - tab100Alias := topoproto.TabletAliasString(tab100.Alias) auditType := "test-audit-operation" message := "test-message" @@ -83,26 +82,26 @@ func TestAuditOperation(t *testing.T) { config.SetAuditToBackend(true) // Auditing should succeed as expected - err = AuditOperation(auditType, tab100Alias, message) + err = AuditOperation(auditType, tab100.Alias, message) require.NoError(t, err) // Check that we can read the recent audits - audits, err := readRecentAudit(tab100Alias, 0) + audits, err := readRecentAudit(tab100.Alias, 0) require.NoError(t, err) require.Len(t, audits, 1) require.EqualValues(t, 1, audits[0].AuditID) require.EqualValues(t, auditType, audits[0].AuditType) require.EqualValues(t, message, audits[0].Message) - require.EqualValues(t, tab100Alias, audits[0].AuditTabletAlias) + testRequireTabletAliasEqual(t, tab100.Alias, audits[0].AuditTabletAlias) // Check the same for no-filtering - audits, err = readRecentAudit("", 0) + audits, err = readRecentAudit(nil, 0) require.NoError(t, err) require.Len(t, audits, 1) require.EqualValues(t, 1, audits[0].AuditID) require.EqualValues(t, auditType, audits[0].AuditType) require.EqualValues(t, message, audits[0].Message) - require.EqualValues(t, tab100Alias, audits[0].AuditTabletAlias) + testRequireTabletAliasEqual(t, tab100.Alias, audits[0].AuditTabletAlias) }) t.Run("audit to File", func(t *testing.T) { @@ -114,7 +113,7 @@ func TestAuditOperation(t *testing.T) { defer os.Remove(file.Name()) config.SetAuditFileLocation(file.Name()) - err = AuditOperation(auditType, tab100Alias, message) + err = AuditOperation(auditType, tab100.Alias, message) require.NoError(t, err) // Give a little time for the write to succeed since it happens in a separate go-routine @@ -132,42 +131,46 @@ type audit struct { AuditID int64 AuditTimestamp string AuditType string - AuditTabletAlias string + AuditTabletAlias *topodatapb.TabletAlias Message string } // readRecentAudit returns a list of audit entries order chronologically descending, using page number. -func readRecentAudit(tabletAlias string, page int) ([]audit, error) { +func readRecentAudit(tabletAlias *topodatapb.TabletAlias, page int) ([]audit, error) { res := []audit{} var args []any whereCondition := `` - if tabletAlias != "" { - whereCondition = `where alias=?` - args = append(args, tabletAlias) + if tabletAlias != nil { + whereCondition = `WHERE alias = ?` + args = append(args, topoproto.TabletAliasString(tabletAlias)) } query := fmt.Sprintf(` - select + SELECT audit_id, audit_timestamp, audit_type, alias, message - from + FROM audit %s - order by - audit_timestamp desc - limit ? - offset ? - `, whereCondition) + ORDER BY + audit_timestamp DESC + LIMIT ? + OFFSET ?`, + whereCondition, + ) args = append(args, config.AuditPageSize, page*config.AuditPageSize) - err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { + err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) (err error) { a := audit{} a.AuditID = m.GetInt64("audit_id") a.AuditTimestamp = m.GetString("audit_timestamp") a.AuditType = m.GetString("audit_type") - a.AuditTabletAlias = m.GetString("alias") a.Message = m.GetString("message") + a.AuditTabletAlias, err = topoproto.ParseTabletAlias(m.GetString("alias")) + if err != nil { + return err + } res = append(res, a) return nil diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go index 2d3455c2f68..88ab6c04dbb 100644 --- a/go/vt/vtorc/inst/instance.go +++ b/go/vt/vtorc/inst/instance.go @@ -30,7 +30,7 @@ import ( type Instance struct { Hostname string Port int - InstanceAlias string + InstanceAlias *topodatapb.TabletAlias // TabletType is the tablet type that the instance // publishes to the VtGates and believes itself to be. TabletType topodatapb.TabletType diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 4fc9064ea2a..8c8f773a281 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -73,6 +73,10 @@ func init() { func initializeInstanceDao() { config.WaitForConfigurationToBeLoaded() + InitializeForgetAliasesCache() +} + +func InitializeForgetAliasesCache() { forgetAliases = cache.New(config.GetInstancePollTime()*3, time.Second) cacheInitializationCompleted.Store(true) } @@ -118,19 +122,20 @@ func ExpireTableData(tableName string, timestampColumn string) error { // logReadTopologyInstanceError logs an error, if applicable, for a ReadTopologyInstance operation, // providing context and hint as for the source of the error. If there's no hint just provide the // original error. -func logReadTopologyInstanceError(tabletAlias string, hint string, err error) error { +func logReadTopologyInstanceError(tabletAlias *topodatapb.TabletAlias, hint string, err error) error { if err == nil { return nil } - if !util.ClearToLog("ReadTopologyInstance", tabletAlias) { + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + if !util.ClearToLog("ReadTopologyInstance", tabletAliasString) { return err } var msg string if hint == "" { - msg = fmt.Sprintf("ReadTopologyInstance(%+v): %+v", tabletAlias, err) + msg = fmt.Sprintf("ReadTopologyInstance(%+v): %+v", tabletAliasString, err) } else { msg = fmt.Sprintf("ReadTopologyInstance(%+v) %+v: %+v", - tabletAlias, + tabletAliasString, strings.ReplaceAll(hint, "%", "%%"), // escape % err) } @@ -151,7 +156,7 @@ func RegisterStats() { // It writes the information retrieved into vtorc's backend. // - writes are optionally buffered. // - timing information can be collected for the stages performed. -func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) { +func ReadTopologyInstanceBufferable(tabletAlias *topodatapb.TabletAlias, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) { defer func() { if r := recover(); r != nil { err = logReadTopologyInstanceError(tabletAlias, "Unexpected, aborting", tb.Errorf("%+v", r)) @@ -168,7 +173,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named partialSuccess := false errorChan := make(chan error, 32) - if tabletAlias == "" { + if tabletAlias == nil { return instance, errors.New("ReadTopologyInstance will not act on empty tablet alias") } @@ -325,7 +330,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named // ------------------------------------------------------------------------- instance.Cell = tablet.Alias.Cell - instance.InstanceAlias = topoproto.TabletAliasString(tablet.Alias) + instance.InstanceAlias = tablet.Alias { latency.Start("backend") @@ -395,10 +400,10 @@ func detectErrantGTIDs(instance *Instance, tablet *topodatapb.Tablet) (err error if instance.primaryExecutedGtidSet == "" && instance.SourceHost == "" { var primaryInstance *Instance primaryAlias, _, _ := ReadShardPrimaryInformation(tablet.Keyspace, tablet.Shard) - if primaryAlias != "" { + if primaryAlias != nil { // Check if the current tablet is the primary. // If it is, then we don't need to run errant gtid detection on it. - if primaryAlias == instance.InstanceAlias { + if topoproto.TabletAliasEqual(primaryAlias, instance.InstanceAlias) { return nil } primaryInstance, _, _ = ReadInstance(primaryAlias) @@ -443,7 +448,7 @@ func detectErrantGTIDs(instance *Instance, tablet *topodatapb.Tablet) (err error errantGtidSet := redactedExecutedGtidSet.Difference(redactedPrimaryExecutedGtidSet) if !errantGtidSet.Empty() { instance.GtidErrant = errantGtidSet.String() - currentErrantGTIDCount.Set(instance.InstanceAlias, errantGtidSet.Count()) + currentErrantGTIDCount.Set(topoproto.TabletAliasString(instance.InstanceAlias), errantGtidSet.Count()) } } } @@ -518,7 +523,7 @@ func ReadInstanceClusterAttributes(instance *Instance) (err error) { } // readInstanceRow reads a single instance row from the vtorc backend database. -func readInstanceRow(m sqlutils.RowMap) *Instance { +func readInstanceRow(m sqlutils.RowMap) (*Instance, error) { instance := NewInstance() instance.Hostname = m.GetString("hostname") @@ -584,9 +589,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.IsLastCheckValid = m.GetBool("is_last_check_valid") instance.SecondsSinceLastSeen = m.GetNullInt64("seconds_since_last_seen") instance.AllowTLS = m.GetBool("allow_tls") - instance.InstanceAlias = m.GetString("alias") instance.LastDiscoveryLatency = time.Duration(m.GetInt64("last_discovery_latency")) * time.Nanosecond + var err error + instance.InstanceAlias, err = topoproto.ParseTabletAlias(m.GetString("alias")) + if err != nil { + return instance, err + } + instance.applyFlavorName() // problems @@ -603,13 +613,13 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.Problems = append(instance.Problems, "errant_gtid") } - return instance + return instance, nil } // readInstancesByCondition is a generic function to read instances from the backend database func readInstancesByCondition(condition string, args []any, sort string) ([](*Instance), error) { readFunc := func() ([]*Instance, error) { - var instances []*Instance + instances := make([]*Instance, 0) if sort == "" { sort = `alias` @@ -632,7 +642,10 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In ) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { - instance := readInstanceRow(m) + instance, err := readInstanceRow(m) + if err != nil { + return err + } instances = append(instances, instance) return nil }) @@ -655,9 +668,10 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In } // ReadInstance reads an instance from the vtorc backend database -func ReadInstance(tabletAlias string) (*Instance, bool, error) { +func ReadInstance(tabletAlias *topodatapb.TabletAlias) (*Instance, bool, error) { condition := `alias = ?` - instances, err := readInstancesByCondition(condition, sqlutils.Args(tabletAlias), "") + args := sqlutils.Args(topoproto.TabletAliasString(tabletAlias)) + instances, err := readInstancesByCondition(condition, args, "") // We know there will be at most one (alias is the PK). // And we expect to find one. readInstanceCounter.Add(1) @@ -671,7 +685,7 @@ func ReadInstance(tabletAlias string) (*Instance, bool, error) { } // ReadProblemInstances reads all instances with problems -func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) { +func ReadProblemInstances(keyspace, shard string) ([]*Instance, error) { condition := ` keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) AND shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) @@ -685,12 +699,15 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) OR (gtid_errant != '') )` - args := sqlutils.Args(keyspace, keyspace, shard, shard, config.GetInstancePollSeconds()*5, config.GetReasonableReplicationLagSeconds(), config.GetReasonableReplicationLagSeconds()) + args := sqlutils.Args(keyspace, keyspace, shard, shard, config.GetInstancePollSeconds()*5, + config.GetReasonableReplicationLagSeconds(), + config.GetReasonableReplicationLagSeconds(), + ) return readInstancesByCondition(condition, args, "") } // ReadInstancesWithErrantGTIds reads all instances with errant GTIDs -func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, error) { +func ReadInstancesWithErrantGTIds(keyspace, shard string) ([]*Instance, error) { condition := ` keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) AND shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) @@ -701,7 +718,7 @@ func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, e } // GetKeyspaceShardName gets the keyspace shard name for the given instance key -func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, err error) { +func GetKeyspaceShardName(tabletAlias *topodatapb.TabletAlias) (keyspace, shard string, err error) { query := `SELECT keyspace, shard @@ -710,7 +727,8 @@ func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, er WHERE alias = ? ` - err = db.QueryVTOrc(query, sqlutils.Args(tabletAlias), func(m sqlutils.RowMap) error { + args := sqlutils.Args(topoproto.TabletAliasString(tabletAlias)) + err = db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { keyspace = m.GetString("keyspace") shard = m.GetString("shard") return nil @@ -721,7 +739,7 @@ func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, er return keyspace, shard, err } -// ReadOutdatedInstanceKeys reads and returns keys for all instances that are not up to date (i.e. +// ReadOutdatedInstances reads and returns tablet aliases for all instances that are not up to date (i.e. // pre-configured time has passed since they were last checked) or the ones whose tablet information was read // but not the mysql information. This could happen if the durability policy of the keyspace wasn't // available at the time it was discovered. This would lead to not having the record of the tablet in the @@ -730,8 +748,8 @@ func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, er // resulted in an actual check! This can happen when TCP/IP connections are hung, in which case the "check" // never returns. In such case we multiply interval by a factor, so as not to open too many connections on // the instance. -func ReadOutdatedInstanceKeys() ([]string, error) { - var res []string +func ReadOutdatedInstances() ([]*topodatapb.TabletAlias, error) { + res := make([]*topodatapb.TabletAlias, 0) query := `SELECT alias FROM @@ -750,17 +768,19 @@ func ReadOutdatedInstanceKeys() ([]string, error) { vitess_tablet.alias = database_instance.alias ) WHERE - database_instance.alias IS NULL - ` + database_instance.alias IS NULL` args := sqlutils.Args(config.GetInstancePollSeconds(), 2*config.GetInstancePollSeconds()) err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { - tabletAlias := m.GetString("alias") + tabletAlias, err := topoproto.ParseTabletAlias(m.GetString("alias")) + if err != nil { + return err + } if !InstanceIsForgotten(tabletAlias) { // only if not in "forget" cache res = append(res, tabletAlias) } - // We don;t return an error because we want to keep filling the outdated instances list. + // We don't return an error because we want to keep filling the outdated instances list. return nil }) if err != nil { @@ -900,7 +920,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, for _, instance := range instances { // number of columns minus 2 as last_checked and last_attempted_check // updated with NOW() - args = append(args, instance.InstanceAlias) + args = append(args, topoproto.TabletAliasString(instance.InstanceAlias)) args = append(args, instance.Hostname) args = append(args, instance.Port) args = append(args, instance.Cell) @@ -1006,7 +1026,7 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError // UpdateInstanceLastChecked updates the last_check timestamp in the vtorc backed database // for a given instance -func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledDisk bool) error { +func UpdateInstanceLastChecked(tabletAlias *topodatapb.TabletAlias, partialSuccess bool, stalledDisk bool) error { writeFunc := func() error { _, err := db.ExecVTOrc(`UPDATE database_instance SET @@ -1018,7 +1038,7 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledD `, partialSuccess, stalledDisk, - tabletAlias, + topoproto.TabletAliasString(tabletAlias), ) if err != nil { log.Error(err) @@ -1036,7 +1056,7 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledD // And so we make sure to note down *before* we even attempt to access the instance; and this raises a red flag when we // wish to access the instance again: if last_attempted_check is *newer* than last_checked, that's bad news and means // we have a "hanging" issue. -func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { +func UpdateInstanceLastAttemptedCheck(tabletAlias *topodatapb.TabletAlias) error { writeFunc := func() error { _, err := db.ExecVTOrc(`UPDATE database_instance SET @@ -1044,7 +1064,7 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { WHERE alias = ? `, - tabletAlias, + topoproto.TabletAliasString(tabletAlias), ) if err != nil { log.Error(err) @@ -1054,32 +1074,34 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { return ExecDBWriteFunc(writeFunc) } -func InstanceIsForgotten(tabletAlias string) bool { - _, found := forgetAliases.Get(tabletAlias) +// InstanceIsForgotten returns true if an instance was forgotten. +func InstanceIsForgotten(tabletAlias *topodatapb.TabletAlias) bool { + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + _, found := forgetAliases.Get(tabletAliasString) return found } // ForgetInstance removes an instance entry from the vtorc backed database. // It may be auto-rediscovered through topology or requested for discovery by multiple means. -func ForgetInstance(tabletAlias string) error { - if tabletAlias == "" { +func ForgetInstance(tabletAlias *topodatapb.TabletAlias) error { + if tabletAlias == nil { errMsg := "ForgetInstance(): empty tabletAlias" log.Errorf(errMsg) return errors.New(errMsg) } - forgetAliases.Set(tabletAlias, true, cache.DefaultExpiration) - log.Infof("Forgetting: %v", tabletAlias) + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + forgetAliases.Set(tabletAliasString, true, cache.DefaultExpiration) + log.Infof("Forgetting: %v", tabletAliasString) // Remove this tablet from errant GTID count metric. - currentErrantGTIDCount.Reset(tabletAlias) + currentErrantGTIDCount.Reset(tabletAliasString) // Delete from the 'vitess_tablet' table. - _, err := db.ExecVTOrc(`DELETE - FROM vitess_tablet + _, err := db.ExecVTOrc(`DELETE FROM + vitess_tablet WHERE - alias = ? - `, - tabletAlias, + alias = ?`, + tabletAliasString, ) if err != nil { log.Error(err) @@ -1087,12 +1109,11 @@ func ForgetInstance(tabletAlias string) error { } // Also delete from the 'database_instance' table. - sqlResult, err := db.ExecVTOrc(`DELETE - FROM database_instance + sqlResult, err := db.ExecVTOrc(`DELETE FROM + database_instance WHERE - alias = ? - `, - tabletAlias, + alias = ?`, + tabletAliasString, ) if err != nil { log.Error(err) @@ -1105,7 +1126,7 @@ func ForgetInstance(tabletAlias string) error { return err } if rows == 0 { - errMsg := fmt.Sprintf("ForgetInstance(): tablet %+v not found", tabletAlias) + errMsg := fmt.Sprintf("ForgetInstance(): tablet %+v not found", tabletAliasString) log.Error(errMsg) return errors.New(errMsg) } @@ -1132,7 +1153,7 @@ func ForgetLongUnseenInstances() error { return err } if rows > 0 { - _ = AuditOperation("forget-unseen", "", fmt.Sprintf("Forgotten instances: %d", rows)) + _ = AuditOperation("forget-unseen", nil, fmt.Sprintf("Forgotten instances: %d", rows)) } return err } @@ -1177,8 +1198,8 @@ func ExpireStaleInstanceBinlogCoordinates() error { expireSeconds = config.StaleInstanceCoordinatesExpireSeconds } writeFunc := func() error { - _, err := db.ExecVTOrc(`DELETE - FROM database_instance_stale_binlog_coordinates + _, err := db.ExecVTOrc(`DELETE FROM + database_instance_stale_binlog_coordinates WHERE first_seen < DATETIME('now', PRINTF('-%d SECOND', ?)) `, @@ -1199,7 +1220,7 @@ func GetDatabaseState() (string, error) { Rows []sqlutils.RowMap } - var dbState []tableState + dbState := make([]tableState, 0, len(db.TableNames)) for _, tableName := range db.TableNames { ts := tableState{ TableName: tableName, diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 9c5dcced26b..dad155a3996 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -38,9 +38,9 @@ func stripSpaces(s string) string { } func mkTestInstances() []*Instance { - i710 := Instance{InstanceAlias: "zone1-i710", Hostname: "i710", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_PRIMARY, ServerID: 710, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 10}} - i720 := Instance{InstanceAlias: "zone1-i720", Hostname: "i720", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_REPLICA, ServerID: 720, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 20}} - i730 := Instance{InstanceAlias: "zone1-i730", Hostname: "i730", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_REPLICA, ServerID: 730, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 30}} + i710 := Instance{InstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 710}, Hostname: "i710", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_PRIMARY, ServerID: 710, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 10}} + i720 := Instance{InstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 720}, Hostname: "i720", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_REPLICA, ServerID: 720, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 20}} + i730 := Instance{InstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 730}, Hostname: "i730", Port: 3306, Cell: "zone1", TabletType: topodatapb.TabletType_REPLICA, ServerID: 730, ExecBinlogCoordinates: BinlogCoordinates{LogFile: "mysql.000007", LogPos: 30}} instances := []*Instance{&i710, &i720, &i730} for _, instance := range instances { instance.Version = "5.6.7" @@ -69,7 +69,7 @@ func TestMkInsertSingle(t *testing.T) { VALUES (?, ?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` - a1 := `zone1-i710, i710, 3306, zone1, 1, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, + a1 := `zone1-0000000710, i710, 3306, zone1, 1, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false,` @@ -95,9 +95,9 @@ func TestMkInsertThree(t *testing.T) { (?, ?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a3 := ` - zone1-i710, i710, 3306, zone1, 1, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false ,false, 0, false, - zone1-i720, i720, 3306, zone1, 2, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, - zone1-i730, i730, 3306, zone1, 2, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, + zone1-0000000710, i710, 3306, zone1, 1, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false ,false, 0, false, + zone1-0000000720, i720, 3306, zone1, 2, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, + zone1-0000000730, i730, 3306, zone1, 2, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, ` sql3, args3, err := mkInsertForInstances(instances[:3], true, true) @@ -143,7 +143,7 @@ func TestGetKeyspaceShardName(t *testing.T) { err = SaveTablet(tab100) require.NoError(t, err) - keyspaceRead, shardRead, err := GetKeyspaceShardName(topoproto.TabletAliasString(tab100.Alias)) + keyspaceRead, shardRead, err := GetKeyspaceShardName(tab100.Alias) require.NoError(t, err) require.Equal(t, ks, keyspaceRead) require.Equal(t, shard, shardRead) @@ -153,16 +153,16 @@ func TestGetKeyspaceShardName(t *testing.T) { func TestReadInstance(t *testing.T) { tests := []struct { name string - tabletAliasToRead string + tabletAliasToRead *topodatapb.TabletAlias instanceFound bool }{ { name: "Read success", - tabletAliasToRead: "zone1-0000000100", + tabletAliasToRead: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, instanceFound: true, }, { name: "Unknown tablet", - tabletAliasToRead: "unknown-tablet", + tabletAliasToRead: &topodatapb.TabletAlias{Cell: "unknown", Uid: 123456789}, instanceFound: false, }, } @@ -264,7 +264,7 @@ func TestReadProblemInstances(t *testing.T) { require.NoError(t, err) var tabletAliases []string for _, instance := range instances { - tabletAliases = append(tabletAliases, instance.InstanceAlias) + tabletAliases = append(tabletAliases, topoproto.TabletAliasString(instance.InstanceAlias)) } require.ElementsMatch(t, tabletAliases, tt.instancesRequired) }) @@ -348,7 +348,7 @@ func TestReadInstancesWithErrantGTIds(t *testing.T) { require.NoError(t, err) var tabletAliases []string for _, instance := range instances { - tabletAliases = append(tabletAliases, instance.InstanceAlias) + tabletAliases = append(tabletAliases, topoproto.TabletAliasString(instance.InstanceAlias)) } require.ElementsMatch(t, tabletAliases, tt.instancesRequired) }) @@ -369,7 +369,7 @@ func TestReadInstanceAllFields(t *testing.T) { wantInstance := &Instance{ Hostname: "localhost", Port: 6711, - InstanceAlias: "zone1-0000000100", + InstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, TabletType: topodatapb.TabletType_REPLICA, Cell: "zone1", ServerID: 1094500338, @@ -450,7 +450,7 @@ func TestReadInstanceAllFields(t *testing.T) { LastDiscoveryLatency: 0, } - instance, found, err := ReadInstance(`zone1-0000000100`) + instance, found, err := ReadInstance(&topodatapb.TabletAlias{Cell: "zone1", Uid: 100}) require.NoError(t, err) require.True(t, found) instance.SecondsSinceLastSeen = sql.NullInt64{} @@ -516,15 +516,15 @@ func TestReadInstancesByCondition(t *testing.T) { require.NoError(t, err) var tabletAliases []string for _, instance := range instances { - tabletAliases = append(tabletAliases, instance.InstanceAlias) + tabletAliases = append(tabletAliases, topoproto.TabletAliasString(instance.InstanceAlias)) } require.EqualValues(t, tt.instancesRequired, tabletAliases) }) } } -// TestReadOutdatedInstanceKeys is used to test the functionality of ReadOutdatedInstanceKeys and verify its failure modes and successes. -func TestReadOutdatedInstanceKeys(t *testing.T) { +// TestReadOutdatedInstances is used to test the functionality of ReadOutdatedInstances and verify its failure modes and successes. +func TestReadOutdatedInstances(t *testing.T) { // The test is intended to be used as follows. The initial data is stored into the database. Following this, some specific queries are run that each individual test specifies to get the desired state. tests := []struct { name string @@ -556,7 +556,7 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { "update database_instance set last_checked = DATETIME('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, - instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, + instancesRequired: []string{"zone1-0000000100", "zone1-0000000103"}, }, } @@ -585,7 +585,7 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { require.NoError(t, err) } - tabletAliases, err := ReadOutdatedInstanceKeys() + tabletAliases, err := ReadOutdatedInstances() errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, @@ -600,7 +600,11 @@ from database_instance`, func(rowMap sqlutils.RowMap) error { }) require.NoError(t, errInDataCollection) require.NoError(t, err) - require.ElementsMatch(t, tabletAliases, tt.instancesRequired) + var tabletAliasStrings []string + for _, tabletAlias := range tabletAliases { + tabletAliasStrings = append(tabletAliasStrings, topoproto.TabletAliasString(tabletAlias)) + } + require.EqualValues(t, tt.instancesRequired, tabletAliasStrings) }) } } @@ -609,32 +613,32 @@ from database_instance`, func(rowMap sqlutils.RowMap) error { func TestUpdateInstanceLastChecked(t *testing.T) { tests := []struct { name string - tabletAlias string + tabletAlias *topodatapb.TabletAlias partialSuccess bool stalledDisk bool conditionToCheck string }{ { name: "Verify updated last checked", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, partialSuccess: false, stalledDisk: false, conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = false", }, { name: "Verify partial success", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, partialSuccess: true, stalledDisk: false, conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true and is_disk_stalled = false", }, { name: "Verify stalled disk", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, partialSuccess: false, stalledDisk: true, conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = true", }, { - name: "Verify no error on unknown tablet", - tabletAlias: "unknown tablet", + name: "Verify no error on nil tablet", + tabletAlias: nil, partialSuccess: true, stalledDisk: true, }, @@ -658,7 +662,7 @@ func TestUpdateInstanceLastChecked(t *testing.T) { // Verify the instance we just updated satisfies the condition specified. instances, err := readInstancesByCondition(tt.conditionToCheck, nil, "") require.NoError(t, err) - var tabletAliases []string + var tabletAliases []*topodatapb.TabletAlias for _, instance := range instances { tabletAliases = append(tabletAliases, instance.InstanceAlias) } @@ -672,16 +676,16 @@ func TestUpdateInstanceLastChecked(t *testing.T) { func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { tests := []struct { name string - tabletAlias string + tabletAlias *topodatapb.TabletAlias conditionToCheck string }{ { name: "Verify updated last checked", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, conditionToCheck: "last_attempted_check >= DATETIME('now', '-30 second')", }, { - name: "Verify no error on unknown tablet", - tabletAlias: "unknown tablet", + name: "Verify no error on nil tablet", + tabletAlias: nil, }, } @@ -703,7 +707,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { // Verify the instance we just updated satisfies the condition specified. instances, err := readInstancesByCondition(tt.conditionToCheck, nil, "") require.NoError(t, err) - var tabletAliases []string + var tabletAliases []*topodatapb.TabletAlias for _, instance := range instances { tabletAliases = append(tabletAliases, instance.InstanceAlias) } @@ -717,28 +721,31 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { func TestForgetInstanceAndInstanceIsForgotten(t *testing.T) { tests := []struct { name string - tabletAlias string + tabletAlias *topodatapb.TabletAlias errExpected string instanceForgotten bool - tabletsExpected []string + tabletsExpected []*topodatapb.TabletAlias }{ { - name: "Unknown tablet", - tabletAlias: "unknown-tablet", - errExpected: "ForgetInstance(): tablet unknown-tablet not found", - instanceForgotten: true, - tabletsExpected: []string{"zone1-0000000100", "zone1-0000000101", "zone1-0000000112", "zone2-0000000200"}, - }, { name: "Empty tabletAlias", - tabletAlias: "", + tabletAlias: nil, errExpected: "ForgetInstance(): empty tabletAlias", instanceForgotten: false, - tabletsExpected: []string{"zone1-0000000100", "zone1-0000000101", "zone1-0000000112", "zone2-0000000200"}, + tabletsExpected: []*topodatapb.TabletAlias{ + {Cell: "zone1", Uid: 100}, + {Cell: "zone1", Uid: 101}, + {Cell: "zone1", Uid: 112}, + {Cell: "zone2", Uid: 200}, + }, }, { name: "Success", - tabletAlias: "zone1-0000000112", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 112}, instanceForgotten: true, - tabletsExpected: []string{"zone1-0000000100", "zone1-0000000101", "zone2-0000000200"}, + tabletsExpected: []*topodatapb.TabletAlias{ + {Cell: "zone1", Uid: 100}, + {Cell: "zone1", Uid: 101}, + {Cell: "zone2", Uid: 200}, + }, }, } @@ -771,7 +778,7 @@ func TestForgetInstanceAndInstanceIsForgotten(t *testing.T) { instances, err := readInstancesByCondition("1=1", nil, "") require.NoError(t, err) - var tabletAliases []string + var tabletAliases []*topodatapb.TabletAlias for _, instance := range instances { tabletAliases = append(tabletAliases, instance.InstanceAlias) } @@ -988,14 +995,14 @@ func TestDetectErrantGTIDs(t *testing.T) { require.NoError(t, err) if tt.primaryInstance != nil { - tt.primaryInstance.InstanceAlias = topoproto.TabletAliasString(primaryTablet.Alias) + tt.primaryInstance.InstanceAlias = primaryTablet.Alias err = SaveTablet(primaryTablet) require.NoError(t, err) err = WriteInstance(tt.primaryInstance, true, nil) require.NoError(t, err) } - tt.instance.InstanceAlias = topoproto.TabletAliasString(tablet.Alias) + tt.instance.InstanceAlias = tablet.Alias err = detectErrantGTIDs(tt.instance, tablet) if tt.wantErr { require.Error(t, err) @@ -1027,7 +1034,7 @@ func TestPrimaryErrantGTIDs(t *testing.T) { instance := &Instance{ SourceHost: "", ExecutedGtidSet: "230ea8ea-81e3-11e4-972a-e25ec4bd140a:1-10589,8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-34,316d193c-70e5-11e5-adb2-ecf4bb2262ff:1-341", - InstanceAlias: topoproto.TabletAliasString(tablet.Alias), + InstanceAlias: tablet.Alias, } // Save shard record for the primary tablet. diff --git a/go/vt/vtorc/inst/instance_test.go b/go/vt/vtorc/inst/instance_test.go index 9ca2f243999..cd3d226ef3e 100644 --- a/go/vt/vtorc/inst/instance_test.go +++ b/go/vt/vtorc/inst/instance_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vtorc/config" ) @@ -28,7 +29,10 @@ func init() { config.MarkConfigurationLoaded() } -var instance1 = Instance{InstanceAlias: "zone1-100"} +var instance1 = Instance{InstanceAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, +}} func TestReplicationThreads(t *testing.T) { { @@ -41,7 +45,7 @@ func TestReplicationThreads(t *testing.T) { require.True(t, instance1.ReplicationThreadsStopped()) } { - i := Instance{InstanceAlias: "zone1-100", ReplicationIOThreadState: ReplicationThreadStateNoThread, ReplicationSQLThreadState: ReplicationThreadStateNoThread} + i := Instance{InstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, ReplicationIOThreadState: ReplicationThreadStateNoThread, ReplicationSQLThreadState: ReplicationThreadStateNoThread} require.False(t, i.ReplicationThreadsExist()) } } diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index 233c592965a..71acc52ee84 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -22,6 +22,7 @@ import ( "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/external/golib/sqlutils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/db" @@ -43,7 +44,11 @@ func ReadShardNames(keyspaceName string) (shardNames []string, err error) { } // ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp. -func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp time.Time, err error) { +func ReadShardPrimaryInformation(keyspaceName, shardName string) ( + primaryAlias *topodatapb.TabletAlias, + primaryTimestamp time.Time, + err error, +) { if err = topo.ValidateKeyspaceName(keyspaceName); err != nil { return } @@ -60,9 +65,14 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s AND shard = ?` args := sqlutils.Args(keyspaceName, shardName) shardFound := false - err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { + err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) (err error) { shardFound = true - primaryAlias = row.GetString("primary_alias") + if primaryAliasStr := row.GetString("primary_alias"); primaryAliasStr != "" { + primaryAlias, err = topoproto.ParseTabletAlias(primaryAliasStr) + if err != nil { + return err + } + } primaryTimestamp = row.GetTime("primary_timestamp") return nil }) diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index f5dede10a48..a1b2b024dfe 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -41,7 +41,7 @@ func TestSaveReadAndDeleteShard(t *testing.T) { keyspaceName string shardName string shard *topodatapb.Shard - primaryAliasWanted string + primaryAliasWanted *topodatapb.TabletAlias primaryTimestampWanted time.Time err string }{ @@ -56,8 +56,8 @@ func TestSaveReadAndDeleteShard(t *testing.T) { }, PrimaryTermStartTime: protoutil.TimeToProto(timeToUse.Add(1 * time.Hour)), }, + primaryAliasWanted: &topodatapb.TabletAlias{Cell: "zone1", Uid: 301}, primaryTimestampWanted: timeToUse.Add(1 * time.Hour).UTC(), - primaryAliasWanted: "zone1-0000000301", }, { name: "Success with empty primary alias", keyspaceName: "ks1", @@ -65,8 +65,8 @@ func TestSaveReadAndDeleteShard(t *testing.T) { shard: &topodatapb.Shard{ PrimaryTermStartTime: protoutil.TimeToProto(timeToUse), }, + primaryAliasWanted: nil, primaryTimestampWanted: timeToUse.UTC(), - primaryAliasWanted: "", }, { name: "Success with empty primary term start time", keyspaceName: "ks1", @@ -77,8 +77,8 @@ func TestSaveReadAndDeleteShard(t *testing.T) { Uid: 301, }, }, + primaryAliasWanted: &topodatapb.TabletAlias{Cell: "zone1", Uid: 301}, primaryTimestampWanted: time.Time{}, - primaryAliasWanted: "zone1-0000000301", }, { name: "No shard found", @@ -102,7 +102,7 @@ func TestSaveReadAndDeleteShard(t *testing.T) { return } require.NoError(t, err) - require.EqualValues(t, tt.primaryAliasWanted, shardPrimaryAlias) + testRequireTabletAliasEqual(t, tt.primaryAliasWanted, shardPrimaryAlias) require.EqualValues(t, tt.primaryTimestampWanted, primaryTimestamp) // ReadShardNames diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index ef4863287d3..bbda9d1393c 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -24,7 +24,6 @@ import ( "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/external/golib/sqlutils" - replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -51,15 +50,14 @@ func fullStatus(tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error } // ReadTablet reads the vitess tablet record. -func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) { +func ReadTablet(tabletAlias *topodatapb.TabletAlias) (*topodatapb.Tablet, error) { query := `SELECT info FROM vitess_tablet WHERE - alias = ? - ` - args := sqlutils.Args(tabletAlias) + alias = ?` + args := sqlutils.Args(topoproto.TabletAliasString(tabletAlias)) tablet := &topodatapb.Tablet{} opts := prototext.UnmarshalOptions{DiscardUnknown: true} err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { diff --git a/go/vt/vtorc/inst/tablet_dao_test.go b/go/vt/vtorc/inst/tablet_dao_test.go index 67fdf1a3227..9ab8cad2ed2 100644 --- a/go/vt/vtorc/inst/tablet_dao_test.go +++ b/go/vt/vtorc/inst/tablet_dao_test.go @@ -12,6 +12,10 @@ import ( "vitess.io/vitess/go/vt/vtorc/db" ) +func testRequireTabletAliasEqual(t *testing.T, expected, got *topodatapb.TabletAlias) { + require.Equal(t, topoproto.TabletAliasString(expected), topoproto.TabletAliasString(got)) +} + func TestSaveAndReadTablet(t *testing.T) { // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. defer func() { @@ -20,14 +24,14 @@ func TestSaveAndReadTablet(t *testing.T) { tests := []struct { name string - tabletAlias string + tabletAlias *topodatapb.TabletAlias tablet *topodatapb.Tablet tabletWanted *topodatapb.Tablet err string }{ { name: "Success with primary type", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, tablet: &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: "zone1", @@ -47,7 +51,7 @@ func TestSaveAndReadTablet(t *testing.T) { tabletWanted: nil, }, { name: "Success with replica type", - tabletAlias: "zone1-0000000100", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, tablet: &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: "zone1", @@ -63,7 +67,7 @@ func TestSaveAndReadTablet(t *testing.T) { tabletWanted: nil, }, { name: "No tablet found", - tabletAlias: "zone1-190734", + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 190734}, tablet: nil, tabletWanted: nil, err: ErrTabletAliasNil.Error(), @@ -87,7 +91,7 @@ func TestSaveAndReadTablet(t *testing.T) { } require.NoError(t, err) require.True(t, topotools.TabletEquality(tt.tabletWanted, readTable)) - require.Equal(t, tt.tabletAlias, topoproto.TabletAliasString(readTable.Alias)) + testRequireTabletAliasEqual(t, tt.tabletAlias, readTable.Alias) }) } } diff --git a/go/vt/vtorc/logic/discovery_queue.go b/go/vt/vtorc/logic/discovery_queue.go index 46f5527b196..33b47397cc5 100644 --- a/go/vt/vtorc/logic/discovery_queue.go +++ b/go/vt/vtorc/logic/discovery_queue.go @@ -30,13 +30,15 @@ import ( "time" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" ) // queueItem represents an item in the DiscoveryQueue. type queueItem struct { - Key string - PushedAt time.Time + TabletAlias *topodatapb.TabletAlias + PushedAt time.Time } // DiscoveryQueue is an ordered queue with deduplication. @@ -56,13 +58,14 @@ func NewDiscoveryQueue() *DiscoveryQueue { // setKeyCheckEnqueued returns true if a key is already enqueued, if // not the key will be marked as enqueued and false is returned. -func (q *DiscoveryQueue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { +func (q *DiscoveryQueue) setKeyCheckEnqueued(tabletAlias *topodatapb.TabletAlias) (alreadyEnqueued bool) { q.mu.Lock() defer q.mu.Unlock() - _, alreadyEnqueued = q.enqueued[key] + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + _, alreadyEnqueued = q.enqueued[tabletAliasString] if !alreadyEnqueued { - q.enqueued[key] = struct{}{} + q.enqueued[tabletAliasString] = struct{}{} } return alreadyEnqueued } @@ -75,36 +78,39 @@ func (q *DiscoveryQueue) QueueLen() int { return len(q.enqueued) } -// Push enqueues a key if it is not on a queue and is not being +// Push enqueues a tablet alias if it is not on a queue and is not being // processed; silently returns otherwise. -func (q *DiscoveryQueue) Push(key string) { - if q.setKeyCheckEnqueued(key) { +func (q *DiscoveryQueue) Push(tabletAlias *topodatapb.TabletAlias) { + if q.setKeyCheckEnqueued(tabletAlias) { return } q.queue <- queueItem{ - Key: key, - PushedAt: time.Now(), + TabletAlias: tabletAlias, + PushedAt: time.Now(), } } -// Consume fetches a key to process; blocks if queue is empty. +// Consume fetches a tablet alias to process; blocks if queue is empty. // Release must be called once after Consume. -func (q *DiscoveryQueue) Consume() string { +func (q *DiscoveryQueue) Consume() *topodatapb.TabletAlias { item := <-q.queue timeOnQueue := time.Since(item.PushedAt) if timeOnQueue > config.GetInstancePollTime() { - log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) + log.Warningf("tablet %v spent %.4fs waiting on a discovery queue", + topoproto.TabletAliasString(item.TabletAlias), + timeOnQueue.Seconds(), + ) } - return item.Key + return item.TabletAlias } -// Release removes a key from a list of being processed keys -// which allows that key to be pushed into the queue again. -func (q *DiscoveryQueue) Release(key string) { +// Release removes a tablet alias from a list of being processed aliases +// which allows that tablet to be pushed into the queue again. +func (q *DiscoveryQueue) Release(tabletAlias *topodatapb.TabletAlias) { q.mu.Lock() defer q.mu.Unlock() - delete(q.enqueued, key) + delete(q.enqueued, topoproto.TabletAliasString(tabletAlias)) } diff --git a/go/vt/vtorc/logic/discovery_queue_test.go b/go/vt/vtorc/logic/discovery_queue_test.go index 39457e72d89..d5e3d5916ef 100644 --- a/go/vt/vtorc/logic/discovery_queue_test.go +++ b/go/vt/vtorc/logic/discovery_queue_test.go @@ -17,44 +17,52 @@ limitations under the License. package logic import ( - "strconv" "testing" "github.com/stretchr/testify/require" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" ) func TestDiscoveryQueue(t *testing.T) { q := NewDiscoveryQueue() require.Zero(t, q.QueueLen()) + tabletAlias := &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 1, + } + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + // Push - q.Push(t.Name()) + q.Push(tabletAlias) require.Equal(t, 1, q.QueueLen()) - _, found := q.enqueued[t.Name()] + _, found := q.enqueued[tabletAliasString] require.True(t, found) // Push duplicate - q.Push(t.Name()) + q.Push(tabletAlias) require.Equal(t, 1, q.QueueLen()) // Consume - require.Equal(t, t.Name(), q.Consume()) + require.Equal(t, tabletAlias, q.Consume()) require.Equal(t, 1, q.QueueLen()) - _, found = q.enqueued[t.Name()] + _, found = q.enqueued[tabletAliasString] require.True(t, found) // Release - q.Release(t.Name()) + q.Release(tabletAlias) require.Zero(t, q.QueueLen()) - _, found = q.enqueued[t.Name()] + _, found = q.enqueued[tabletAliasString] require.False(t, found) } type testDiscoveryQueue interface { QueueLen() int - Push(string) - Consume() string - Release(string) + Push(*topodatapb.TabletAlias) + Consume() *topodatapb.TabletAlias + Release(*topodatapb.TabletAlias) } func BenchmarkDiscoveryQueues(b *testing.B) { @@ -69,7 +77,10 @@ func BenchmarkDiscoveryQueues(b *testing.B) { b.Run(test.name, func(b *testing.B) { for i := 0; i < b.N; i++ { for i := 0; i < 1000; i++ { - q.Push(b.Name() + strconv.Itoa(i)) + q.Push(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: uint32(i), + }) } q.QueueLen() for i := 0; i < 1000; i++ { diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 08e6cf48814..b702b7d7c87 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -104,11 +104,11 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Verify that we only have ks1 and ks3 in vtorc's db. verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") - verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "") + verifyPrimaryAlias(t, "ks1", "-80", &topodatapb.TabletAlias{Cell: "zone_ks1", Uid: 100}, "") verifyKeyspaceInfo(t, "ks2", nil, "keyspace not found") - verifyPrimaryAlias(t, "ks2", "80-", "", "shard not found") + verifyPrimaryAlias(t, "ks2", "80-", nil, "shard not found") verifyKeyspaceInfo(t, "ks3", keyspaceSnapshot, "") - verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "") + verifyPrimaryAlias(t, "ks3", "80-", &topodatapb.TabletAlias{Cell: "zone_ks3", Uid: 101}, "") verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found") // Set clusters to watch to watch all keyspaces @@ -121,13 +121,13 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Verify that all the keyspaces are correctly reloaded verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "") - verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "") + verifyPrimaryAlias(t, "ks1", "-80", &topodatapb.TabletAlias{Cell: "zone_ks1", Uid: 100}, "") verifyKeyspaceInfo(t, "ks2", keyspaceDurabilitySemiSync, "") - verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "") + verifyPrimaryAlias(t, "ks2", "80-", &topodatapb.TabletAlias{Cell: "zone_ks2", Uid: 101}, "") verifyKeyspaceInfo(t, "ks3", keyspaceSnapshot, "") - verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "") + verifyPrimaryAlias(t, "ks3", "80-", &topodatapb.TabletAlias{Cell: "zone_ks3", Uid: 101}, "") verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") - verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") + verifyPrimaryAlias(t, "ks4", "80-", &topodatapb.TabletAlias{Cell: "zone_ks4", Uid: 101}, "") } func TestRefreshKeyspace(t *testing.T) { @@ -261,7 +261,7 @@ func TestRefreshShard(t *testing.T) { keyspaceName string shardName string shard *topodatapb.Shard - primaryAliasWanted string + primaryAliasWanted *topodatapb.TabletAlias err string }{ { @@ -274,14 +274,14 @@ func TestRefreshShard(t *testing.T) { Uid: 302, }, }, - primaryAliasWanted: "zone1-0000000302", + primaryAliasWanted: &topodatapb.TabletAlias{Cell: "zone1", Uid: 302}, err: "", }, { name: "Success with empty primaryAlias", keyspaceName: "ks1", shardName: "-80", shard: &topodatapb.Shard{}, - primaryAliasWanted: "", + primaryAliasWanted: nil, err: "", }, { name: "No shard found", @@ -318,7 +318,7 @@ func TestRefreshShard(t *testing.T) { } // verifyPrimaryAlias verifies the correct primary alias is stored in the database for the given keyspace shard. -func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAliasWanted string, errString string) { +func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAliasWanted *topodatapb.TabletAlias, errString string) { primaryAlias, _, err := inst.ReadShardPrimaryInformation(keyspaceName, shardName) if errString != "" { require.ErrorContains(t, err, errString) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 4e3ca725455..9770f9e3f08 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "slices" "strings" "sync" "time" @@ -236,13 +235,13 @@ func getAllTablets(ctx context.Context, cells []string) (tabletsByCell map[strin // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while func refreshAllTablets(ctx context.Context) error { - return refreshTabletsUsing(ctx, func(tabletAlias string) { + return refreshTabletsUsing(ctx, func(tabletAlias *topodatapb.TabletAlias) { DiscoverInstance(tabletAlias, false /* forceDiscovery */) }, false /* forceRefresh */) } // refreshTabletsUsing refreshes tablets using a provided loader. -func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { +func refreshTabletsUsing(ctx context.Context, loader func(*topodatapb.TabletAlias), forceRefresh bool) error { // Get all cells. cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cellsCancel() @@ -277,7 +276,7 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f }() // Refresh the filtered tablets and forget stale tablets. - query := "select alias from vitess_tablet where cell = ?" + query := "SELECT alias FROM vitess_tablet WHERE cell = ?" args := sqlutils.Args(cell) refreshTablets(matchedTablets, query, args, loader, forceRefresh, nil) } @@ -288,10 +287,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records) // for a given shard. This function is meant to be called before or after a cluster-wide operation that we know will // change the replication information for the entire cluster drastically enough to warrant a full forceful refresh -func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string, tabletsToIgnore []string) { +func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string, tabletsToIgnore []*topodatapb.TabletAlias) { refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(tabletAlias string) { + refreshTabletsInKeyspaceShard(refreshCtx, keyspace, shard, func(tabletAlias *topodatapb.TabletAlias) { DiscoverInstance(tabletAlias, true) }, true, tabletsToIgnore) } @@ -300,32 +299,32 @@ func forceRefreshAllTabletsInShard(ctx context.Context, keyspace, shard string, // of the given keyspace-shard. func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { log.Infof("refresh of tablet records of shard - %v/%v", keyspace, shard) - refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias string) { + refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(*topodatapb.TabletAlias) { // No-op // We only want to refresh the tablet information for the given shard }, false, nil) } -func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(*topodatapb.TabletAlias), forceRefresh bool, tabletsToIgnore []*topodatapb.TabletAlias) { tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) return } - query := "select alias from vitess_tablet where keyspace = ? and shard = ?" + query := "SELECT alias FROM vitess_tablet WHERE keyspace = ? AND shard = ?" args := sqlutils.Args(keyspace, shard) refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) } -func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(*topodatapb.TabletAlias), forceRefresh bool, tabletsToIgnore []*topodatapb.TabletAlias) { // Discover new tablets. - latestInstances := make(map[string]bool) + latestInstances := make(map[string]bool, len(tablets)) var wg sync.WaitGroup for _, tabletInfo := range tablets { tablet := tabletInfo.Tablet tabletAliasString := topoproto.TabletAliasString(tablet.Alias) latestInstances[tabletAliasString] = true - old, err := inst.ReadTablet(tabletAliasString) + old, err := inst.ReadTablet(tablet.Alias) if err != nil && err != inst.ErrTabletAliasNil { log.Error(err) continue @@ -337,24 +336,29 @@ func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader log.Error(err) continue } + wg.Add(1) go func() { defer wg.Done() - if slices.Contains(tabletsToIgnore, topoproto.TabletAliasString(tablet.Alias)) { - return + for _, tabletToIgnore := range tabletsToIgnore { + if topoproto.TabletAliasEqual(tabletToIgnore, tablet.Alias) { + return + } } - loader(tabletAliasString) + loader(tablet.Alias) }() log.Infof("Discovered: %v", tablet) } wg.Wait() // Forget tablets that were removed. - var toForget []string + toForget := make([]*topodatapb.TabletAlias, 0) err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { - tabletAlias := row.GetString("alias") - if !latestInstances[tabletAlias] { - toForget = append(toForget, tabletAlias) + tabletAliasString := row.GetString("alias") + if !latestInstances[tabletAliasString] { + if tabletAlias, err := topoproto.ParseTabletAlias(tabletAliasString); err == nil { + toForget = append(toForget, tabletAlias) + } } return nil }) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 5a382457378..b0a533e61c9 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -333,6 +333,9 @@ func TestRefreshTabletsInKeyspaceShard(t *testing.T) { db.ClearVTOrcDatabase() }() + // Init forgetAliases cache in go/vt/vtorc/inst + inst.InitializeForgetAliasesCache() + // Create a memory topo-server and create the keyspace and shard records ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -365,7 +368,7 @@ func TestRefreshTabletsInKeyspaceShard(t *testing.T) { t.Run("call refreshTabletsInKeyspaceShard again - force refresh with ignore", func(t *testing.T) { // We expect 2 tablets to be refreshed since we requested force refresh, but we are ignoring one of them. - verifyRefreshTabletsInKeyspaceShard(t, true, 2, tablets, []string{topoproto.TabletAliasString(tab100.Alias)}) + verifyRefreshTabletsInKeyspaceShard(t, true, 2, tablets, []*topodatapb.TabletAlias{tab100.Alias}) }) t.Run("tablet shutdown removes mysql hostname and port. We shouldn't forget the tablet", func(t *testing.T) { @@ -420,7 +423,7 @@ func TestRefreshTabletsInKeyspaceShard(t *testing.T) { }) tab100.MysqlPort = 100 // We refresh once more to ensure we don't affect the next tests since we've made a change again. - refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias string) {}, false, nil) + refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias *topodatapb.TabletAlias) {}, false, nil) }() // Let's assume tab100 restarted on a different pod. This would change its tablet hostname and port _, err = ts.UpdateTabletFields(context.Background(), tab100.Alias, func(tablet *topodatapb.Tablet) error { @@ -442,7 +445,7 @@ func TestRefreshTabletsInKeyspaceShard(t *testing.T) { }) tab101.Type = topodatapb.TabletType_REPLICA // We refresh once more to ensure we don't affect the next tests since we've made a change again. - refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias string) {}, false, nil) + refreshTabletsInKeyspaceShard(ctx, keyspace, shard, func(tabletAlias *topodatapb.TabletAlias) {}, false, nil) }() // A replica tablet can be converted to drained type if it has an errant GTID. _, err = ts.UpdateTabletFields(context.Background(), tab101.Alias, func(tablet *topodatapb.Tablet) error { @@ -521,11 +524,11 @@ func TestShardPrimary(t *testing.T) { // verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that // the number of instances refreshed matches the parameter and all the tablets match the ones provided -func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) { +func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []*topodatapb.TabletAlias) { var instancesRefreshed atomic.Int32 instancesRefreshed.Store(0) // call refreshTabletsInKeyspaceShard while counting all the instances that are refreshed - refreshTabletsInKeyspaceShard(context.Background(), keyspace, shard, func(string) { + refreshTabletsInKeyspaceShard(context.Background(), keyspace, shard, func(*topodatapb.TabletAlias) { instancesRefreshed.Add(1) }, forceRefresh, tabletsToIgnore) // Verify that all the tablets are present in the database @@ -541,13 +544,12 @@ func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instan // is the same as the one provided or reading it gives the same error as expected func verifyTabletInfo(t *testing.T, tabletWanted *topodatapb.Tablet, errString string) { t.Helper() - tabletAlias := topoproto.TabletAliasString(tabletWanted.Alias) - tablet, err := inst.ReadTablet(tabletAlias) + tablet, err := inst.ReadTablet(tabletWanted.Alias) if errString != "" { assert.EqualError(t, err, errString) } else { assert.NoError(t, err) - assert.EqualValues(t, tabletAlias, topoproto.TabletAliasString(tablet.Alias)) + assert.Equal(t, topoproto.TabletAliasString(tabletWanted.Alias), topoproto.TabletAliasString(tablet.Alias)) diff := cmp.Diff(tablet, tabletWanted, cmp.Comparer(proto.Equal)) assert.Empty(t, diff) } @@ -567,18 +569,18 @@ func verifyTabletCount(t *testing.T, countWanted int) { func TestGetLockAction(t *testing.T) { tests := []struct { - analysedInstance string + analysedInstance *topodatapb.TabletAlias code inst.AnalysisCode want string }{ { - analysedInstance: "zone1-100", + analysedInstance: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, code: inst.DeadPrimary, - want: "VTOrc Recovery for DeadPrimary on zone1-100", + want: "VTOrc Recovery for DeadPrimary on zone1-0000000100", }, { - analysedInstance: "zone1-200", + analysedInstance: &topodatapb.TabletAlias{Cell: "zone1", Uid: 200}, code: inst.ReplicationStopped, - want: "VTOrc Recovery for ReplicationStopped on zone1-200", + want: "VTOrc Recovery for ReplicationStopped on zone1-0000000200", }, } for _, tt := range tests { diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index e05462e0a4f..3ef72161958 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -151,7 +151,7 @@ const ( type TopologyRecovery struct { ID int64 AnalysisEntry inst.DetectionAnalysis - SuccessorAlias string + SuccessorAlias *topodatapb.TabletAlias IsSuccessful bool AllErrors []string RecoveryStartTimestamp string @@ -206,8 +206,8 @@ func initializeTopologyRecoveryPostConfiguration() { config.WaitForConfigurationToBeLoaded() } -func getLockAction(analysedInstance string, code inst.AnalysisCode) string { - return fmt.Sprintf("VTOrc Recovery for %v on %v", code, analysedInstance) +func getLockAction(tabletAlias *topodatapb.TabletAlias, code inst.AnalysisCode) string { + return fmt.Sprintf("VTOrc Recovery for %v on %v", code, topoproto.TabletAliasString(tabletAlias)) } // LockShard locks the keyspace-shard preventing others from performing conflicting actions. @@ -345,7 +345,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.DetectionAn } if ev != nil && ev.NewPrimary != nil { - promotedReplica, _, _ = inst.ReadInstance(topoproto.TabletAliasString(ev.NewPrimary.Alias)) + promotedReplica, _, _ = inst.ReadInstance(ev.NewPrimary.Alias) } postErsCompletion(topologyRecovery, analysisEntry, recoveryName, promotedReplica) return true, topologyRecovery, err @@ -424,8 +424,7 @@ func restartDirectReplicas(ctx context.Context, analysisEntry *inst.DetectionAna // Find the primary tablet for semi-sync policy determination var primaryTablet *topodatapb.Tablet for _, tabletInfo := range tablets { - tabletAlias := topoproto.TabletAliasString(tabletInfo.Alias) - if tabletAlias == analysisEntry.AnalyzedInstanceAlias { + if topoproto.TabletAliasEqual(tabletInfo.Alias, analysisEntry.AnalyzedInstanceAlias) { primaryTablet = tabletInfo.Tablet break } @@ -448,22 +447,22 @@ func restartDirectReplicas(ctx context.Context, analysisEntry *inst.DetectionAna } tabletInfo := tablets[tabletIndex] tablet := tabletInfo.Tablet - tabletAlias := topoproto.TabletAliasString(tablet.Alias) + tabletAliasString := topoproto.TabletAliasString(tablet.Alias) // Skip the primary itself - if tabletAlias == analysisEntry.AnalyzedInstanceAlias { + if topoproto.TabletAliasEqual(tablet.Alias, analysisEntry.AnalyzedInstanceAlias) { continue } - if err := urgentOperations.Add(tabletAlias, true, cache.DefaultExpiration); err != nil { + if err := urgentOperations.Add(tabletAliasString, true, cache.DefaultExpiration); err != nil { // Rate limit interval has not passed yet continue } // Read the instance to check replication source - instance, found, err := inst.ReadInstance(tabletAlias) + instance, found, err := inst.ReadInstance(tablet.Alias) if err != nil || !found { - logger.Warningf("Could not read instance information for %s: %v", tabletAlias, err) + logger.Warningf("Could not read instance information for %s: %v", tabletAliasString, err) continue } if instance.ReplicationDepth != 1 { @@ -473,11 +472,11 @@ func restartDirectReplicas(ctx context.Context, analysisEntry *inst.DetectionAna restartExpected++ eg.Go(func() error { - logger.Infof("Restarting replication on direct replica %s", tabletAlias) - _ = AuditTopologyRecovery(topologyRecovery, "Restarting replication on direct replica "+tabletAlias) + logger.Infof("Restarting replication on direct replica %s", tabletAliasString) + _ = AuditTopologyRecovery(topologyRecovery, "Restarting replication on direct replica "+tabletAliasString) if err := tmc.StopReplication(ctx, tablet); err != nil { - logger.Errorf("Failed to stop replication on %s: %v", tabletAlias, err) + logger.Errorf("Failed to stop replication on %s: %v", tabletAliasString, err) return err } @@ -485,10 +484,10 @@ func restartDirectReplicas(ctx context.Context, analysisEntry *inst.DetectionAna semiSync := policy.IsReplicaSemiSync(durabilityPolicy, primaryTablet, tablet) if err := tmc.StartReplication(ctx, tablet, semiSync); err != nil { - logger.Errorf("Failed to start replication on %s: %v", tabletAlias, err) + logger.Errorf("Failed to start replication on %s: %v", tabletAliasString, err) return err } - logger.Infof("Successfully restarted replication on %s", tabletAlias) + logger.Infof("Successfully restarted replication on %s", tabletAliasString) restartPerformed.Add(1) return nil }) @@ -721,23 +720,25 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) analysisEntry.IsActionableRecovery = isActionableRecovery + analyzedInstanceAliasString := topoproto.TabletAliasString(analysisEntry.AnalyzedInstanceAlias) if recoverySkipCode != RecoverySkipNone { skipReason := recoverySkipCode.String() logger.Warningf("Skipping recovery for problem: %+v, recovery: %+v, reason: %+v, aborting recovery", analysisEntry.Analysis, skipReason, recoveryName) recoveriesSkippedCounter.Add(append(recoveryLabels, skipReason), 1) + // Unhandled problem type if analysisEntry.Analysis != inst.NoProblem { - if util.ClearToLog("executeCheckAndRecoverFunction", analysisEntry.AnalyzedInstanceAlias) { + if util.ClearToLog("executeCheckAndRecoverFunction", analyzedInstanceAliasString) { logger.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: tablet: %+v", - analysisEntry.AnalyzedInstanceAlias) + analyzedInstanceAliasString) } } return nil } // we have a recovery function; its execution still depends on filters if not disabled. - if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: detection", analysisEntry.AnalyzedInstanceAlias) { - logger.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: detection", analyzedInstanceAliasString) { + logger.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analyzedInstanceAliasString, isActionableRecovery) } // At this point we have validated there's a failure scenario for which we have a recovery path. @@ -754,7 +755,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err logger.Errorf("Unable to determine if recovery is disabled globally, still attempting to recover: %v", err) } else if recoveryDisabledGlobally { logger.Infof("CheckAndRecover: Tablet: %+v: NOT Recovering host (disabled globally)", - analysisEntry.AnalyzedInstanceAlias) + analyzedInstanceAliasString) recoveriesSkippedCounter.Add(append(recoveryLabels, RecoverySkipGlobalDisabled.String()), 1) return err @@ -789,7 +790,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure // that the data that we use now is up-to-date. if isActionableRecovery { - logger.Infof("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analyzedInstanceAliasString) // The first step we have to do is refresh the keyspace and shard information // This is required to know if the durability policies have changed or not // If they have, then recoveries like ReplicaSemiSyncMustNotBeSet, etc won't be valid anymore. @@ -804,7 +805,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err // of a shard because a new tablet could have been promoted, and we need to have this visibility // before we run a shard-wide operation of our own. if isShardWideRecovery(checkAndRecoverFunctionCode) { - var tabletsToIgnore []string + tabletsToIgnore := make([]*topodatapb.TabletAlias, 0) if checkAndRecoverFunctionCode == recoverDeadPrimaryFunc { tabletsToIgnore = append(tabletsToIgnore, analysisEntry.AnalyzedInstanceAlias) } @@ -827,15 +828,14 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err primaryTablet, err := shardPrimary(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) if err != nil { logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while finding the shard primary: %v", - analysisEntry.AnalyzedInstanceAlias, err) + analyzedInstanceAliasString, err) return err } - primaryTabletAlias := topoproto.TabletAliasString(primaryTablet.Alias) // We can skip the refresh if we know the tablet we are looking at is the primary tablet. // This would be the case for PrimaryHasPrimary recovery. We don't need to refresh the same tablet twice. - if analysisEntry.AnalyzedInstanceAlias != primaryTabletAlias { + if !topoproto.TabletAliasEqual(analysisEntry.AnalyzedInstanceAlias, primaryTablet.Alias) { logger.Info("Discovering primary instance") - DiscoverInstance(primaryTabletAlias, true) + DiscoverInstance(primaryTablet.Alias, true) } } alreadyFixed, err := checkIfAlreadyFixed(analysisEntry) @@ -845,15 +845,15 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err return err } if alreadyFixed { - logger.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analyzedInstanceAliasString) recoveriesSkippedCounter.Add(append(recoveryLabels, RecoverySkipStaleAnalysis.String()), 1) return nil } } // Actually attempt recovery: - if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceAlias) { - logger.Infof("executeCheckAndRecoverFunction: proceeding with recovery on %+v; isRecoverable?: %+v", analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analyzedInstanceAliasString) { + logger.Infof("executeCheckAndRecoverFunction: proceeding with recovery on %+v; isRecoverable?: %+v", analyzedInstanceAliasString, isActionableRecovery) } recoveryAttempted, topologyRecovery, err := getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry, logger) if !recoveryAttempted { @@ -897,7 +897,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.DetectionAnalysis) (err // recheckPrimaryHealth check the health of the primary node. // It then checks whether, given the re-discovered primary health, the original recovery is still valid. // If not valid then it will abort the current analysis. -func recheckPrimaryHealth(analysisEntry *inst.DetectionAnalysis, recoveryLabels []string, discoveryFunc func(string, bool)) error { +func recheckPrimaryHealth(analysisEntry *inst.DetectionAnalysis, recoveryLabels []string, discoveryFunc func(*topodatapb.TabletAlias, bool)) error { originalAnalysisEntry := analysisEntry.Analysis primaryTabletAlias := analysisEntry.AnalyzedInstancePrimaryAlias @@ -935,7 +935,8 @@ func checkIfAlreadyFixed(analysisEntry *inst.DetectionAnalysis) (bool, error) { for _, entry := range analysisEntries { // If there is a analysis which has the same recovery required, then we should proceed with the recovery - if entry.AnalyzedInstanceAlias == analysisEntry.AnalyzedInstanceAlias && analysisEntriesHaveSameRecovery(analysisEntry, entry) { + tabletAliasesEqual := topoproto.TabletAliasEqual(entry.AnalyzedInstanceAlias, analysisEntry.AnalyzedInstanceAlias) + if tabletAliasesEqual && analysisEntriesHaveSameRecovery(analysisEntry, entry) { return false, nil } } @@ -961,7 +962,7 @@ func CheckAndRecover() { if e.Analysis != inst.NoProblem { names := [...]string{ string(e.Analysis), - e.AnalyzedInstanceAlias, + topoproto.TabletAliasString(e.AnalyzedInstanceAlias), e.AnalyzedKeyspace, e.AnalyzedShard, } @@ -993,10 +994,11 @@ func CheckAndRecover() { func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.DetectionAnalysis, promotedReplica *inst.Instance) { if promotedReplica != nil { - message := fmt.Sprintf("promoted replica: %+v", promotedReplica.InstanceAlias) + tabletAliasString := topoproto.TabletAliasString(promotedReplica.InstanceAlias) + message := fmt.Sprintf("promoted replica: %+v", tabletAliasString) _ = AuditTopologyRecovery(topologyRecovery, message) _ = inst.AuditOperation(string(analysisEntry.Analysis), analysisEntry.AnalyzedInstanceAlias, message) - _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%+v: successfully promoted %+v", analysisEntry.Analysis, promotedReplica.InstanceAlias)) + _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%+v: successfully promoted %+v", analysisEntry.Analysis, tabletAliasString)) } } @@ -1046,7 +1048,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.DetectionAnalysis, ) if ev != nil && ev.NewPrimary != nil { - promotedReplica, _, _ = inst.ReadInstance(topoproto.TabletAliasString(ev.NewPrimary.Alias)) + promotedReplica, _, _ = inst.ReadInstance(ev.NewPrimary.Alias) } postPrsCompletion(topologyRecovery, analysisEntry, promotedReplica) return true, topologyRecovery, err diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index e7cd8710047..e7ded20f283 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -23,6 +23,7 @@ import ( "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" @@ -44,7 +45,7 @@ func InsertRecoveryDetection(analysisEntry *inst.DetectionAnalysis) error { ?, DATETIME('now') )`, - analysisEntry.AnalyzedInstanceAlias, + topoproto.TabletAliasString(analysisEntry.AnalyzedInstanceAlias), string(analysisEntry.Analysis), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, @@ -83,11 +84,11 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ? )`, sqlutils.NilIfZero(topologyRecovery.ID), - analysisEntry.AnalyzedInstanceAlias, + topoproto.TabletAliasString(analysisEntry.AnalyzedInstanceAlias), string(analysisEntry.Analysis), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, - analysisEntry.AnalyzedInstanceAlias, + topoproto.TabletAliasString(analysisEntry.AnalyzedInstanceAlias), analysisEntry.RecoveryId, ) if err != nil { @@ -145,7 +146,7 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { recovery_id = ? `, topologyRecovery.IsSuccessful, - topologyRecovery.SuccessorAlias, + topoproto.TabletAliasString(topologyRecovery.SuccessorAlias), strings.Join(topologyRecovery.AllErrors, "\n"), topologyRecovery.ID, ) @@ -174,8 +175,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topology_recovery %s ORDER BY recovery_id DESC - %s - `, + %s`, whereCondition, limit, ) @@ -187,12 +187,22 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.RecoveryEndTimestamp = m.GetString("end_recovery") topologyRecovery.IsSuccessful = m.GetBool("is_successful") - topologyRecovery.AnalysisEntry.AnalyzedInstanceAlias = m.GetString("alias") + var err error + topologyRecovery.AnalysisEntry.AnalyzedInstanceAlias, err = topoproto.ParseTabletAlias(m.GetString("alias")) + if err != nil { + return err + } + topologyRecovery.AnalysisEntry.Analysis = inst.AnalysisCode(m.GetString("analysis")) topologyRecovery.AnalysisEntry.AnalyzedKeyspace = m.GetString("keyspace") topologyRecovery.AnalysisEntry.AnalyzedShard = m.GetString("shard") - topologyRecovery.SuccessorAlias = m.GetString("successor_alias") + if successorAlias := m.GetString("successor_alias"); successorAlias != "" { + topologyRecovery.SuccessorAlias, err = topoproto.ParseTabletAlias(successorAlias) + if err != nil { + return err + } + } topologyRecovery.AllErrors = strings.Split(m.GetString("all_errors"), "\n") diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 938871d1db2..920605be1c6 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/external/golib/sqlutils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" @@ -40,7 +42,7 @@ func TestTopologyRecovery(t *testing.T) { }() detectionAnalysis := inst.DetectionAnalysis{ - AnalyzedInstanceAlias: "zone1-0000000101", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, TabletType: tab101.Type, AnalyzedKeyspace: keyspace, AnalyzedShard: shard, @@ -138,15 +140,15 @@ func TestInsertRecoveryDetection(t *testing.T) { defer func() { db.ClearVTOrcDatabase() }() - ra := &inst.DetectionAnalysis{ - AnalyzedInstanceAlias: "alias-1", + da := &inst.DetectionAnalysis{ + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 1}, AnalyzedKeyspace: keyspace, AnalyzedShard: shard, Analysis: inst.ClusterHasNoPrimary, } - err := InsertRecoveryDetection(ra) + err := InsertRecoveryDetection(da) require.NoError(t, err) - require.NotEqual(t, 0, ra.RecoveryId) + require.NotEqual(t, 0, da.RecoveryId) var rows []map[string]sqlutils.CellData err = db.QueryVTOrc("select * from recovery_detection", nil, func(rowMap sqlutils.RowMap) error { @@ -155,10 +157,12 @@ func TestInsertRecoveryDetection(t *testing.T) { }) require.NoError(t, err) require.Len(t, rows, 1) - require.EqualValues(t, ra.AnalyzedInstanceAlias, rows[0]["alias"].String) - require.EqualValues(t, ra.Analysis, rows[0]["analysis"].String) + tabletAlias, err := topoproto.ParseTabletAlias(rows[0]["alias"].String) + require.NoError(t, err) + require.EqualValues(t, da.AnalyzedInstanceAlias, tabletAlias) + require.EqualValues(t, da.Analysis, rows[0]["analysis"].String) require.EqualValues(t, keyspace, rows[0]["keyspace"].String) require.EqualValues(t, shard, rows[0]["shard"].String) - require.EqualValues(t, strconv.Itoa(int(ra.RecoveryId)), rows[0]["detection_id"].String) + require.EqualValues(t, strconv.Itoa(int(da.RecoveryId)), rows[0]["detection_id"].String) require.NotEqual(t, "", rows[0]["detection_timestamp"].String) } diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index 8f7765a390b..fbe2b9d8808 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/vt/external/golib/sqlutils" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" @@ -139,7 +138,7 @@ func TestElectNewPrimaryPanic(t *testing.T) { err = inst.SaveTablet(tablet) require.NoError(t, err) analysisEntry := &inst.DetectionAnalysis{ - AnalyzedInstanceAlias: topoproto.TabletAliasString(tablet.Alias), + AnalyzedInstanceAlias: tablet.Alias, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -189,11 +188,11 @@ func TestRecoveryRegistration(t *testing.T) { err = inst.SaveTablet(replica) require.NoError(t, err) primaryAnalysisEntry := inst.DetectionAnalysis{ - AnalyzedInstanceAlias: topoproto.TabletAliasString(primary.Alias), + AnalyzedInstanceAlias: primary.Alias, Analysis: inst.ReplicationStopped, } replicaAnalysisEntry := inst.DetectionAnalysis{ - AnalyzedInstanceAlias: topoproto.TabletAliasString(replica.Alias), + AnalyzedInstanceAlias: replica.Alias, Analysis: inst.DeadPrimary, } ctx, cancel := context.WithCancel(context.Background()) @@ -432,7 +431,7 @@ func TestRecheckPrimaryHealth(t *testing.T) { name: "analysis change", info: []*test.InfoForRecoveryAnalysis{{ TabletInfo: &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Alias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, Hostname: "localhost", Keyspace: "ks", Shard: "0", @@ -452,7 +451,7 @@ func TestRecheckPrimaryHealth(t *testing.T) { name: "analysis did not change", info: []*test.InfoForRecoveryAnalysis{{ TabletInfo: &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + Alias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, Hostname: "localhost", Keyspace: "ks", Shard: "0", @@ -471,7 +470,7 @@ func TestRecheckPrimaryHealth(t *testing.T) { CurrentTabletType: int(topodatapb.TabletType_PRIMARY), }, { TabletInfo: &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Alias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, Hostname: "localhost", Keyspace: "ks", Shard: "0", @@ -481,7 +480,7 @@ func TestRecheckPrimaryHealth(t *testing.T) { }, DurabilityPolicy: policy.DurabilityNone, PrimaryTabletInfo: &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 101}, + Alias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, }, LastCheckValid: 1, ReadOnly: 1, @@ -508,11 +507,11 @@ func TestRecheckPrimaryHealth(t *testing.T) { db.Db = test.NewTestDB([][]sqlutils.RowMap{rowMaps}) err := recheckPrimaryHealth(&inst.DetectionAnalysis{ - AnalyzedInstanceAlias: "zon1-0000000100", + AnalyzedInstanceAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, Analysis: inst.ReplicationStopped, AnalyzedKeyspace: "ks", AnalyzedShard: "0", - }, []string{"ks", "0", ""}, func(s string, b bool) { + }, []string{"ks", "0", ""}, func(*topodatapb.TabletAlias, bool) { // the implementation for DiscoverInstance is not required because we are mocking the db response. }) diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 518be508e72..7fefdd80973 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -28,7 +28,9 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/process" @@ -39,8 +41,8 @@ import ( // requested for discovery. It can be continuously updated // as discovery process progresses. var discoveryQueue *DiscoveryQueue -var snapshotDiscoveryKeys chan string -var snapshotDiscoveryKeysMutex sync.Mutex +var snapshotDiscoveryAliases chan *topodatapb.TabletAlias +var snapshotDiscoveryAliasesMutex sync.Mutex var hasReceivedSIGTERM int32 var ( @@ -59,7 +61,7 @@ var ( var recentDiscoveryOperationKeys *cache.Cache func init() { - snapshotDiscoveryKeys = make(chan string, 10) + snapshotDiscoveryAliases = make(chan *topodatapb.TabletAlias, 10) onMetricsTick(func() { discoveryQueueLengthGauge.Set(int64(discoveryQueue.QueueLen())) @@ -77,7 +79,7 @@ func closeVTOrc() { log.Infof("Starting VTOrc shutdown") atomic.StoreInt32(&hasReceivedSIGTERM, 1) // Poke other go routines to stop cleanly here ... - _ = inst.AuditOperation("shutdown", "", "Triggered via SIGTERM") + _ = inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM") // wait for the locks to be released waitForLocksRelease() ts.Close() @@ -130,9 +132,10 @@ func handleDiscoveryRequests() { // DiscoverInstance will attempt to discover (poll) an instance (unless // it is already up-to-date) and will also ensure that its primary and // replicas (if any) are also checked. -func DiscoverInstance(tabletAlias string, forceDiscovery bool) { +func DiscoverInstance(tabletAlias *topodatapb.TabletAlias, forceDiscovery bool) { + tabletAliasString := topoproto.TabletAliasString(tabletAlias) if inst.InstanceIsForgotten(tabletAlias) { - log.Infof("discoverInstance: skipping discovery of %+v because it is set to be forgotten", tabletAlias) + log.Infof("discoverInstance: skipping discovery of %+v because it is set to be forgotten", tabletAliasString) return } @@ -148,18 +151,18 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { discoveryTime := latency.Elapsed("total") if discoveryTime > config.GetInstancePollTime() { instancePollSecondsExceededCounter.Add(1) - log.Warningf("discoverInstance exceeded InstancePollSeconds for %+v, took %.4fs", tabletAlias, discoveryTime.Seconds()) + log.Warningf("discoverInstance exceeded InstancePollSeconds for %+v, took %.4fs", tabletAliasString, discoveryTime.Seconds()) } }() - if tabletAlias == "" { + if tabletAlias == nil { return } // Calculate the expiry period each time as InstancePollSeconds // _may_ change during the run of the process (via SIGHUP) and // it is not possible to change the cache's default expiry.. - if existsInCacheError := recentDiscoveryOperationKeys.Add(tabletAlias, true, config.GetInstancePollTime()); existsInCacheError != nil && !forceDiscovery { + if existsInCacheError := recentDiscoveryOperationKeys.Add(tabletAliasString, true, config.GetInstancePollTime()); existsInCacheError != nil && !forceDiscovery { // Just recently attempted return } @@ -188,16 +191,16 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { discoveryInstanceTimings.Add("Other", otherLatency) if err != nil { - log.Errorf("Failed to discover %s (force: %t), err: %v", tabletAlias, forceDiscovery, err) + log.Errorf("Failed to discover %s (force: %t), err: %v", tabletAliasString, forceDiscovery, err) } else { - log.Infof("Discovered %s (force: %t): %+v", tabletAlias, forceDiscovery, instance) + log.Infof("Discovered %s (force: %t): %+v", tabletAliasString, forceDiscovery, instance) } if instance == nil { failedDiscoveriesCounter.Add(1) - if util.ClearToLog("discoverInstance", tabletAlias) { + if util.ClearToLog("discoverInstance", tabletAliasString) { log.Warningf("DiscoverInstance(%+v) instance is nil in %.3fs (Backend: %.3fs, Instance: %.3fs), error=%+v", - tabletAlias, + tabletAliasString, totalLatency.Seconds(), backendLatency.Seconds(), instanceLatency.Seconds(), @@ -209,7 +212,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { // onHealthTick handles the actions to take to discover/poll instances func onHealthTick() { - tabletAliases, err := inst.ReadOutdatedInstanceKeys() + tabletAliases, err := inst.ReadOutdatedInstances() if err != nil { log.Error(err) } @@ -217,21 +220,17 @@ func onHealthTick() { func() { // Normally onHealthTick() shouldn't run concurrently. It is kicked by a ticker. // However it _is_ invoked inside a goroutine. I like to be safe here. - snapshotDiscoveryKeysMutex.Lock() - defer snapshotDiscoveryKeysMutex.Unlock() + snapshotDiscoveryAliasesMutex.Lock() + defer snapshotDiscoveryAliasesMutex.Unlock() - countSnapshotKeys := len(snapshotDiscoveryKeys) - for i := 0; i < countSnapshotKeys; i++ { - tabletAliases = append(tabletAliases, <-snapshotDiscoveryKeys) + countSnapshotAliases := len(snapshotDiscoveryAliases) + for i := 0; i < countSnapshotAliases; i++ { + tabletAliases = append(tabletAliases, <-snapshotDiscoveryAliases) } }() - // avoid any logging unless there's something to be done - if len(tabletAliases) > 0 { - for _, tabletAlias := range tabletAliases { - if tabletAlias != "" { - discoveryQueue.Push(tabletAlias) - } - } + + for _, tabletAlias := range tabletAliases { + discoveryQueue.Push(tabletAlias) } } diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index 172760fd384..92864923be5 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/acl" "vitess.io/vitess/go/viperutil/debug" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/logic" "vitess.io/vitess/go/vt/vtorc/process" @@ -42,7 +43,6 @@ const ( disableGlobalRecoveriesAPI = "/api/disable-global-recoveries" enableGlobalRecoveriesAPI = "/api/enable-global-recoveries" detectionAnalysisAPI = "/api/detection-analysis" - replicationAnalysisAPI = "/api/replication-analysis" // TODO: remove in v24+ databaseStateAPI = "/api/database-state" configAPI = "/api/config" healthAPI = "/debug/health" @@ -59,7 +59,6 @@ var ( disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI, detectionAnalysisAPI, - replicationAnalysisAPI, databaseStateAPI, configAPI, healthAPI, @@ -85,7 +84,7 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request problemsAPIHandler(response, request) case errantGTIDsAPI: errantGTIDsAPIHandler(response, request) - case detectionAnalysisAPI, replicationAnalysisAPI: + case detectionAnalysisAPI: detectionAnalysisAPIHandler(response, request) case databaseStateAPI: databaseStateAPIHandler(response) @@ -105,7 +104,7 @@ func getACLPermissionLevelForAPI(apiEndpoint string) string { return acl.MONITORING case disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI: return acl.ADMIN - case detectionAnalysisAPI, replicationAnalysisAPI, configAPI: + case detectionAnalysisAPI, configAPI: return acl.MONITORING case healthAPI, databaseStateAPI: return acl.MONITORING @@ -210,6 +209,46 @@ func enableGlobalRecoveriesAPIHandler(response http.ResponseWriter) { writePlainTextResponse(response, "Global recoveries enabled", http.StatusOK) } +// LegacyDetectionAnalysisJSON is a wrapper to *inst.DetectionAnalysis that +// provides the AnalyzedInstanceAlias field in the old string-based format. +type LegacyDetectionAnalysisJSON struct { + *inst.DetectionAnalysis +} + +// NewLegacyDetectionAnalysisJSON wraps a *inst.DetectionAnalysis with *LegacyDetectionAnalysisJSON. +func NewLegacyDetectionAnalysisJSON(da *inst.DetectionAnalysis) *LegacyDetectionAnalysisJSON { + return &LegacyDetectionAnalysisJSON{da} +} + +// MarshalJSON converts a *LegacyDetectionAnalysisJSON to the legacy format JSON. +func (ldaj *LegacyDetectionAnalysisJSON) MarshalJSON() ([]byte, error) { + type Alias LegacyDetectionAnalysisJSON + return json.Marshal(&struct { + AnalyzedInstanceAlias string `json:"AnalyzedInstanceAlias"` + *Alias + }{ + AnalyzedInstanceAlias: topoproto.TabletAliasString(ldaj.AnalyzedInstanceAlias), + Alias: (*Alias)(ldaj), + }) +} + +// UnmarshalJSON converts JSON to a *LegacyDetectionAnalysisJSON. +func (ldaj *LegacyDetectionAnalysisJSON) UnmarshalJSON(data []byte) (err error) { + type Alias LegacyDetectionAnalysisJSON + s := &struct { + AnalyzedInstanceAlias string `json:"AnalyzedInstanceAlias"` + *Alias + }{ + AnalyzedInstanceAlias: topoproto.TabletAliasString(ldaj.AnalyzedInstanceAlias), + Alias: (*Alias)(ldaj), + } + if err := json.Unmarshal(data, &s); err != nil { + return nil + } + ldaj.AnalyzedInstanceAlias, err = topoproto.ParseTabletAlias(s.AnalyzedInstanceAlias) + return err +} + // detectionAnalysisAPIHandler is the handler for the detectionAnalysisAPI endpoint func detectionAnalysisAPIHandler(response http.ResponseWriter, request *http.Request) { // This api also supports filtering by shard and keyspace provided. @@ -224,10 +263,11 @@ func detectionAnalysisAPIHandler(response http.ResponseWriter, request *http.Req http.Error(response, err.Error(), http.StatusInternalServerError) return } - - // TODO: We can also add filtering for a specific instance too based on the tablet alias. - // Currently inst.DetectionAnalysis doesn't store the tablet alias, but once it does we can filter on that too - returnAsJSON(response, http.StatusOK, analysis) + processed := make([]*LegacyDetectionAnalysisJSON, 0) + for _, a := range analysis { + processed = append(processed, NewLegacyDetectionAnalysisJSON(a)) + } + returnAsJSON(response, http.StatusOK, processed) } // healthAPIHandler is the handler for the healthAPI endpoint diff --git a/go/vt/vtorc/server/api_test.go b/go/vt/vtorc/server/api_test.go index e46f4d86f8a..d04e7cb5621 100644 --- a/go/vt/vtorc/server/api_test.go +++ b/go/vt/vtorc/server/api_test.go @@ -28,9 +28,6 @@ func TestGetACLPermissionLevelForAPI(t *testing.T) { }, { apiEndpoint: detectionAnalysisAPI, want: acl.MONITORING, - }, { - apiEndpoint: replicationAnalysisAPI, - want: acl.MONITORING, }, { apiEndpoint: healthAPI, want: acl.MONITORING,