Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func TestReparentIgnoreReplicas(t *testing.T) {

// We expect this one to fail because we have an unreachable replica
out, err := utils.Ers(clusterInstance, nil, "60s", "30s")
require.NotNil(t, err, out)
require.Error(t, err, out)

// Now let's run it again, but set the command to ignore the unreachable replica.
out, err = utils.ErsIgnoreTablet(clusterInstance, nil, "60s", "30s", []*cluster.Vttablet{tablets[2]}, false)
require.Nil(t, err, out)
require.NoError(t, err, out)

// We'll bring back the replica we took down.
utils.RestartTablet(t, clusterInstance, tablets[2])
Expand All @@ -94,12 +94,48 @@ func TestReparentIgnoreReplicas(t *testing.T) {
newPrimary := utils.GetNewPrimary(t, clusterInstance)
// Check new primary has latest transaction.
err = utils.CheckInsertedValues(ctx, t, newPrimary, insertVal)
require.Nil(t, err)
require.NoError(t, err)

// bring back the old primary as a replica, check that it catches up
utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0])
}

// TestReparentIgnoreMySQLDownReplica tests that reachable vttablets with mysqld crashed/down (reporting
// vtrpcpb.Code_UNAVAILABLE error code) are ignored in EmergencyReparentShard actions (requires v24+).
func TestReparentIgnoreMySQLDownReplica(t *testing.T) {
// Skip test on vtctld versions < v24.
vtctldMajorVer, err := cluster.GetMajorVersion("vtctld")
require.NoError(t, err)
if vtctldMajorVer < 24 {
t.Skip("Skipping test since `StopReplicationAndGetStatus` does not provide the required error codes on vtctld < v24")
}

// Setup reparent cluster.
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
insertVal := utils.ConfirmReplication(t, tablets[0], tablets[1:])

// Make the current primary tablet and database unavailable.
utils.StopTablet(t, tablets[0], true)

// Take down MySQL (but not vttablet) on a replica.
utils.StopTabletMySQL(t, tablets[1])

// We expect this ERS to succeed because we ignored the tablet with MySQL down.
out, err := utils.Ers(clusterInstance, nil, "60s", "30s")
require.NoError(t, err, out)

// Confirm the primary alias changed.
newPrimary := utils.GetNewPrimary(t, clusterInstance)
require.NotEqualValues(t, tablets[0].Alias, newPrimary.Alias) // original primary
require.NotEqualValues(t, tablets[1].Alias, newPrimary.Alias) // replica w/mysqld down

// Check new primary has latest transaction.
err = utils.CheckInsertedValues(context.Background(), t, newPrimary, insertVal)
require.NoError(t, err)
}

func TestReparentDownPrimary(t *testing.T) {
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
Expand Down Expand Up @@ -377,7 +413,7 @@ func TestERSPromoteRdonly(t *testing.T) {

// We expect this one to fail because we have ignored all the replicas and have only the rdonly's which should not be promoted
out, err := utils.ErsIgnoreTablet(clusterInstance, nil, "30s", "30s", []*cluster.Vttablet{tablets[3]}, false)
require.NotNil(t, err, out)
require.Error(t, err, out)

out, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetShard", utils.KeyspaceShard)
require.NoError(t, err)
Expand Down
10 changes: 7 additions & 3 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func Ers(clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet, to
func ErsIgnoreTablet(clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet, timeout, waitReplicasTimeout string, tabletsToIgnore []*cluster.Vttablet, preventCrossCellPromotion bool) (string, error) {
var args []string
if timeout != "" {
args = append(args, "--action_timeout", timeout)
args = append(args, "--action-timeout", timeout)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves a warning because of underscore deprecation

}
args = append(args, "EmergencyReparentShard", fmt.Sprintf("%s/%s", KeyspaceName, ShardName))
if tab != nil {
Expand Down Expand Up @@ -574,11 +574,15 @@ func StopTablet(t *testing.T, tab *cluster.Vttablet, stopDatabase bool) {
err := tab.VttabletProcess.TearDownWithTimeout(30 * time.Second)
require.NoError(t, err)
if stopDatabase {
err = tab.MysqlctlProcess.Stop()
require.NoError(t, err)
StopTabletMySQL(t, tab)
}
}

// StopTabletMySQL stops the database on a tablet.
func StopTabletMySQL(t *testing.T, tab *cluster.Vttablet) {
require.NoError(t, tab.MysqlctlProcess.Stop())
}

// RestartTablet restarts the tablet
func RestartTablet(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet) {
tab.MysqlctlProcess.InitMysql = false
Expand Down
26 changes: 16 additions & 10 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
tmc "vitess.io/vitess/go/vt/vttablet/grpctmclient"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -68,9 +67,9 @@ func TestEnsureDB(t *testing.T) {
killTablets(tablet)
}

// TestGRPCErrorCode_UNAVAILABLE tests that vttablet returns correct gRPC codes,
// in this case codes.Unavailable/vtrpcpb.Code_UNAVAILABLE when mysqld is down.
func TestGRPCErrorCode_UNAVAILABLE(t *testing.T) {
// TestGRPCErrorCode_MySQLDown tests that vttablet returns the correct vtrpcpb error code
// (vtrpcpb.Code_UNAVAILABLE) when mysqld is down but vttablet is still up.
func TestGRPCErrorCode_MySQLDown(t *testing.T) {
// Create new tablet
tablet := clusterInstance.NewVttabletInstance("replica", 0, "")
defer killTablets(tablet)
Expand All @@ -86,18 +85,25 @@ func TestGRPCErrorCode_UNAVAILABLE(t *testing.T) {
err = clusterInstance.StartVttablet(tablet, false, "SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

vttablet := getTablet(tablet.GrpcPort)

// test FullStatus before stopping mysql
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
defer cancel()
res, err := tmClient.FullStatus(ctx, vttablet)
require.NotNil(t, res)
require.Equal(t, vtrpcpb.Code_OK, vterrors.Code(err))

// kill the mysql process
err = tablet.MysqlctlProcess.Stop()
require.NoError(t, err)

// confirm we get vtrpcpb.Code_UNAVAILABLE when calling FullStatus,
// because this will try and fail to connect to mysql
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tmClient := tmc.NewClient()
vttablet := getTablet(tablet.GrpcPort)
_, err = tmClient.FullStatus(ctx, vttablet)
assert.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err))
ctx2, cancel2 := context.WithTimeout(t.Context(), time.Second*10)
defer cancel2()
_, err = tmClient.FullStatus(ctx2, vttablet)
require.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err))
}

// TestResetReplicationParameters tests that the RPC ResetReplicationParameters works as intended.
Expand Down
23 changes: 19 additions & 4 deletions go/vt/vtctl/reparentutil/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -117,7 +117,7 @@ func FindPositionsOfAllCandidates(
// Potentially bail. If any other tablet is detected to have
// GTID-based relay log positions, we will return the error recorded
// here.
emptyRelayPosErrorRecorder.RecordError(vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias))
emptyRelayPosErrorRecorder.RecordError(vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias))
}
}

Expand All @@ -126,7 +126,7 @@ func FindPositionsOfAllCandidates(
}

if isGTIDBased && isNonGTIDBased {
return nil, false, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "encountered mix of GTID-based and non GTID-based relay logs")
return nil, false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "encountered mix of GTID-based and non GTID-based relay logs")
}

// Store the final positions in the map.
Expand Down Expand Up @@ -159,7 +159,7 @@ func FindPositionsOfAllCandidates(
// error if the Before state of replication is nil.
func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) (bool, error) {
if stopStatus == nil || stopStatus.Before == nil {
return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus)
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus)
}

replStatus := replication.ProtoToReplicationStatus(stopStatus.Before)
Expand Down Expand Up @@ -251,6 +251,21 @@ func stopReplicationAndBuildStatusMaps(

stopReplicationStatus, err := tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY)
if err != nil {
// If we receive a vtrpcpb.Code_UNAVAILABLE error code from the StopReplicationAndGetStatus RPC,
// this means the call was received by vttablet but the backend mysqld is down/unreachable. We log
// and skip tablets in this state because we are reasonably sure they cannot be the most advanced
// because mysqld is (likely) down. In some cases this may not be true and mysqld IS running + most
// advanced but somehow vttablet sees it as down, but this should be a very rare exception, meaning
// we prioritize completing the reparent (availability) for the common case. If this edge case were
// to occur, errant GTID(s) will be produced; if this happens often we should return UNAVAILABLE
// from vttablet using more detailed criteria (check the pidfile + running PID, etc).
Comment on lines +259 to +261
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's not worth improving this case today? The lack of detail may have been in place simply because it did not impact any operations. But now we're building logic around the meaning we infer from the response. I'd say we should do this now, provided you have an idea how to do it.
What do we get in the error? I wonder if it's not an SQLError we can extract that maps to one of these: https://dev.mysql.com/doc/refman/en/gone-away.html
You can see elsewhere in the code base where we look to see if the error contains an SQL error (a MySQL error, and if so, if the code matches 1 or more MySQL error codes). Actually... just below this here 😆

if topo.IsReplicaType(tabletInfo.Tablet.Type) && vterrors.Code(err) == vtrpcpb.Code_UNAVAILABLE {
logger.Warningf("replica %v is reachable but mysql is unavailable: %v", alias, err)
mustWaitForTablet = false // used in defer
err = nil // used in defer
return
}

sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERNotReplica {
var primaryStatus *replicationdatapb.PrimaryStatus
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestDemotePrimaryWhenSemiSyncBecomesUnblockedBetweenChecks(t *testing.T) {
}

// TestUndoDemotePrimaryStateChange tests that UndoDemotePrimary
// if able to change the state of the tablet to Primary if there
// is able to change the state of the tablet to Primary if there
// is a mismatch with the tablet record.
func TestUndoDemotePrimaryStateChange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
Loading