Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ func TestReparentIgnoreReplicas(t *testing.T) {
utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0])
}

func TestReparentIgnoreMySQLDownReplica(t *testing.T) {
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

insertVal := utils.ConfirmReplication(t, tablets[0], tablets[1:])

// Make the current primary agent and database unavailable.
utils.StopTablet(t, tablets[0], true)

// Take down MySQL (but not vttablet) on a replica.
utils.StopTabletMySQL(t, tablets[1])

// We expect this ERS to succeed because we ignored the tablet with MySQL down.
out, err := utils.Ers(clusterInstance, nil, "60s", "30s")
require.Nil(t, err, out)

newPrimary := utils.GetNewPrimary(t, clusterInstance)
// Check new primary has latest transaction.
err = utils.CheckInsertedValues(context.Background(), t, newPrimary, insertVal)
require.Nil(t, err)
}

func TestReparentDownPrimary(t *testing.T) {
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
Expand Down
19 changes: 11 additions & 8 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/utils"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/utils"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
)

var (
Expand Down Expand Up @@ -396,7 +395,7 @@ func Ers(clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet, to
func ErsIgnoreTablet(clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet, timeout, waitReplicasTimeout string, tabletsToIgnore []*cluster.Vttablet, preventCrossCellPromotion bool) (string, error) {
var args []string
if timeout != "" {
args = append(args, "--action_timeout", timeout)
args = append(args, "--action-timeout", timeout)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves a warning because of underscore deprecation

}
args = append(args, "EmergencyReparentShard", fmt.Sprintf("%s/%s", KeyspaceName, ShardName))
if tab != nil {
Expand Down Expand Up @@ -575,11 +574,15 @@ func StopTablet(t *testing.T, tab *cluster.Vttablet, stopDatabase bool) {
err := tab.VttabletProcess.TearDownWithTimeout(30 * time.Second)
require.NoError(t, err)
if stopDatabase {
err = tab.MysqlctlProcess.Stop()
require.NoError(t, err)
StopTabletMySQL(t, tab)
}
}

// StopTabletMySQL stops the database on a tablet.
func StopTabletMySQL(t *testing.T, tab *cluster.Vttablet) {
require.NoError(t, tab.MysqlctlProcess.Stop())
}

// RestartTablet restarts the tablet
func RestartTablet(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tab *cluster.Vttablet) {
tab.MysqlctlProcess.InitMysql = false
Expand Down
24 changes: 16 additions & 8 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func TestEnsureDB(t *testing.T) {
killTablets(tablet)
}

// TestGRPCErrorCode_UNAVAILABLE tests that vttablet returns correct gRPC codes,
// in this case codes.Unavailable/vtrpcpb.Code_UNAVAILABLE when mysqld is down.
func TestGRPCErrorCode_UNAVAILABLE(t *testing.T) {
// TestGRPCErrorCode_MySQLDown tests that vttablet returns the correct vtrpcpb error code
// (vtrpcpb.Code_UNAVAILABLE) when mysqld is down but vttablet is still up.
func TestGRPCErrorCode_MySQLDown(t *testing.T) {
// Create new tablet
tablet := clusterInstance.NewVttabletInstance("replica", 0, "")
defer killTablets(tablet)
Expand All @@ -86,18 +86,26 @@ func TestGRPCErrorCode_UNAVAILABLE(t *testing.T) {
err = clusterInstance.StartVttablet(tablet, false, "SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

vttablet := getTablet(tablet.GrpcPort)

// test FullStatus before stopping mysql
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res, err := tmClient.FullStatus(ctx, vttablet)
require.NotNil(t, res)
require.Equal(t, vtrpcpb.Code_OK, vterrors.Code(err))

// kill the mysql process
err = tablet.MysqlctlProcess.Stop()
require.NoError(t, err)

// confirm we get vtrpcpb.Code_UNAVAILABLE when calling FullStatus,
// because this will try and fail to connect to mysql
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second*10)
defer cancel2()
tmClient := tmc.NewClient()
vttablet := getTablet(tablet.GrpcPort)
_, err = tmClient.FullStatus(ctx, vttablet)
assert.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err))
_, err = tmClient.FullStatus(ctx2, vttablet)
require.Equal(t, vtrpcpb.Code_UNAVAILABLE, vterrors.Code(err))
}

// TestResetReplicationParameters tests that the RPC ResetReplicationParameters works as intended.
Expand Down
15 changes: 11 additions & 4 deletions go/vt/vtctl/reparentutil/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -117,7 +117,7 @@ func FindPositionsOfAllCandidates(
// Potentially bail. If any other tablet is detected to have
// GTID-based relay log positions, we will return the error recorded
// here.
emptyRelayPosErrorRecorder.RecordError(vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias))
emptyRelayPosErrorRecorder.RecordError(vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias))
}
}

Expand All @@ -126,7 +126,7 @@ func FindPositionsOfAllCandidates(
}

if isGTIDBased && isNonGTIDBased {
return nil, false, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "encountered mix of GTID-based and non GTID-based relay logs")
return nil, false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "encountered mix of GTID-based and non GTID-based relay logs")
}

// Store the final positions in the map.
Expand Down Expand Up @@ -159,7 +159,7 @@ func FindPositionsOfAllCandidates(
// error if the Before state of replication is nil.
func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) (bool, error) {
if stopStatus == nil || stopStatus.Before == nil {
return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus)
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus)
}

replStatus := replication.ProtoToReplicationStatus(stopStatus.Before)
Expand Down Expand Up @@ -251,6 +251,13 @@ func stopReplicationAndBuildStatusMaps(

stopReplicationStatus, err := tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY)
if err != nil {
if vterrors.Code(err) == vtrpcpb.Code_UNAVAILABLE {
logger.Warningf("replica %v is reachable but mysqld is unavailable", alias)
err = nil
mustWaitForTablet = false
return
}

sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERNotReplica {
var primaryStatus *replicationdatapb.PrimaryStatus
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// ReplicationStatus returns the replication status
Expand Down Expand Up @@ -980,6 +979,9 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe
// returns an error, so a user can optionally inspect the status before a stop was called.
rs, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok {
return StopReplicationAndGetStatusResponse{}, vterrors.New(sqlErr.VtRpcErrorCode(), err.Error())
}
return StopReplicationAndGetStatusResponse{}, vterrors.Wrap(err, "before status failed")
}
before := replication.ReplicationStatusToProto(rs)
Expand Down
Loading