diff --git a/go/vt/mysqlctl/clone.go b/go/vt/mysqlctl/clone.go index e154bf58cd2..928853742e7 100644 --- a/go/vt/mysqlctl/clone.go +++ b/go/vt/mysqlctl/clone.go @@ -23,12 +23,21 @@ import ( "strings" "time" + "github.com/spf13/pflag" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/capabilities" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/utils" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttls" ) @@ -38,6 +47,97 @@ const ( cloneStatusQuery = "SELECT STATE, ERROR_NO, ERROR_MESSAGE FROM performance_schema.clone_status ORDER BY ID DESC LIMIT 1" ) +var ( + cloneFromPrimary = false + cloneFromTablet = "" + cloneRestartWaitTimeout = 5 * time.Minute +) + +func init() { + // TODO: enable these flags for vttablet and vtbackup. + for _, cmd := range []string{ /*"vttablet", "vtbackup"*/ } { + servenv.OnParseFor(cmd, registerCloneFlags) + } +} + +func registerCloneFlags(fs *pflag.FlagSet) { + utils.SetFlagBoolVar(fs, &cloneFromPrimary, "clone-from-primary", cloneFromPrimary, "Clone data from the primary tablet in the shard using MySQL CLONE REMOTE instead of restoring from backup. Requires MySQL 8.0.17+. Mutually exclusive with --clone-from-tablet.") + utils.SetFlagStringVar(fs, &cloneFromTablet, "clone-from-tablet", cloneFromTablet, "Clone data from this tablet using MySQL CLONE REMOTE instead of restoring from backup (tablet alias, e.g., zone1-123). Requires MySQL 8.0.17+. Mutually exclusive with --clone-from-primary.") + utils.SetFlagDurationVar(fs, &cloneRestartWaitTimeout, "clone-restart-wait-timeout", cloneRestartWaitTimeout, "Timeout for waiting for MySQL to restart after CLONE REMOTE.") +} + +// CloneFromDonor clones data from the specified donor tablet using MySQL CLONE REMOTE. +// It returns the GTID position of the cloned data. +func CloneFromDonor(ctx context.Context, topoServer *topo.Server, mysqld MysqlDaemon, keyspace, shard string) (replication.Position, error) { + var donorAlias *topodatapb.TabletAlias + var err error + + switch { + case cloneFromPrimary && cloneFromTablet != "": + return replication.Position{}, errors.New("--clone-from-primary and --clone-from-tablet are mutually exclusive") + case cloneFromPrimary: + // Look up the primary tablet from topology. + log.Infof("Looking up primary tablet for shard %s/%s for use as CLONE REMOTE donor", keyspace, shard) + si, err := topoServer.GetShard(ctx, keyspace, shard) + if err != nil { + return replication.Position{}, fmt.Errorf("failed to get shard %s/%s: %v", keyspace, shard, err) + } + if topoproto.TabletAliasIsZero(si.PrimaryAlias) { + return replication.Position{}, fmt.Errorf("shard %s/%s has no primary", keyspace, shard) + } + donorAlias = si.PrimaryAlias + log.Infof("Found primary tablet %s for use as CLONE REMOTE donor", topoproto.TabletAliasString(donorAlias)) + case cloneFromTablet != "": + // Parse the explicit donor tablet alias. + log.Infof("Using tablet %s for use as CLONE REMOTE donor", cloneFromTablet) + donorAlias, err = topoproto.ParseTabletAlias(cloneFromTablet) + if err != nil { + return replication.Position{}, fmt.Errorf("invalid tablet alias %q: %v", cloneFromTablet, err) + } + default: + return replication.Position{}, errors.New("no donor specified") + } + + // Get donor tablet info from topology. + donorTablet, err := topoServer.GetTablet(ctx, donorAlias) + if err != nil { + return replication.Position{}, fmt.Errorf("failed to get tablet %s from topology: %v", topoproto.TabletAliasString(donorAlias), err) + } + + // Get clone credentials. + cloneConfig := dbconfigs.GlobalDBConfigs.CloneUser + if cloneConfig.User == "" { + return replication.Position{}, errors.New("clone user not configured; set --db-clone-user flag") + } + + // Create the clone executor. + executor := &CloneExecutor{ + DonorHost: donorTablet.MysqlHostname, + DonorPort: int(donorTablet.MysqlPort), + DonorUser: cloneConfig.User, + DonorPassword: cloneConfig.Password, + UseSSL: cloneConfig.UseSSL, + } + + log.Infof("Clone executor configured for donor %s:%d", executor.DonorHost, executor.DonorPort) + + // Execute the clone operation. + // Note: ExecuteClone will wait for mysqld to restart and for the CLONE plugin to report successful completion + // success via performance_schema before returning. + if err := executor.ExecuteClone(ctx, mysqld, cloneRestartWaitTimeout); err != nil { + return replication.Position{}, fmt.Errorf("clone execution failed: %v", err) + } + + // Get the GTID position from the cloned data. + pos, err := mysqld.PrimaryPosition(ctx) + if err != nil { + return replication.Position{}, fmt.Errorf("failed to get position after clone: %v", err) + } + + log.Infof("Clone completed successfully at position %v", pos) + return pos, nil +} + // CloneExecutor handles MySQL CLONE REMOTE operations for backup and replica provisioning. // It executes CLONE INSTANCE FROM on the recipient to clone data from a donor. type CloneExecutor struct { diff --git a/go/vt/mysqlctl/clone_test.go b/go/vt/mysqlctl/clone_test.go index 4ab33d4dad2..46dc8449f3c 100644 --- a/go/vt/mysqlctl/clone_test.go +++ b/go/vt/mysqlctl/clone_test.go @@ -18,15 +18,29 @@ package mysqlctl import ( "context" + "errors" "fmt" + "net" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/logutil" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttls" ) @@ -378,3 +392,339 @@ func TestValidateRecipient(t *testing.T) { }) } } + +type cloneFromDonorTestEnv struct { + ctx context.Context + logger *logutil.MemoryLogger + ts *topo.Server + mysqld *FakeMysqlDaemon + keyspace string + shard string + donorHost string + donorPort int + donorAlias *topodatapb.TabletAlias +} + +func createCloneFromDonorTestEnv(t *testing.T, donorHost string, donorPort int) *cloneFromDonorTestEnv { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + + // Create in-memory topo server with a test cell + ts := memorytopo.NewServer(ctx, "cell1") + + keyspace := "test" + shard := "-" + + // Create keyspace in topology + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + + // Create donor tablet alias + donorAlias := &topodatapb.TabletAlias{Cell: "cell1", Uid: 100} + + // Create shard in topology with donor as primary + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + _, err := ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = donorAlias + return nil + }) + require.NoError(t, err) + + // Create donor tablet in topology with the mock server's address + tablet := &topodatapb.Tablet{ + Alias: donorAlias, + MysqlHostname: donorHost, + MysqlPort: int32(donorPort), + Keyspace: keyspace, + Shard: shard, + } + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + // Create fake MySQL daemon + sqldb := fakesqldb.New(t) + sqldb.SetNeverFail(true) + mysqld := NewFakeMysqlDaemon(sqldb) + + // Set up default clone credentials (success path) + dbconfigs.GlobalDBConfigs.CloneUser = dbconfigs.UserConfig{ + User: "clone_user", + Password: "password", + } + + // Configure recipient mysqld for successful validation and clone by default + mysqld.Version = "8.0.32" + mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SELECT @@version": sqltypes.MakeTestResult( + sqltypes.MakeTestFields("@@version", "varchar"), + "8.0.32", + ), + "SELECT PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME = 'clone'": sqltypes.MakeTestResult( + sqltypes.MakeTestFields("PLUGIN_STATUS", "varchar"), + "ACTIVE", + ), + "SELECT STATE, ERROR_NO, ERROR_MESSAGE FROM performance_schema.clone_status": sqltypes.MakeTestResult( + sqltypes.MakeTestFields("STATE|ERROR_NO|ERROR_MESSAGE", "varchar|varchar|varchar"), + "Completed|0|", + ), + } + + // Set a valid GTID position by default + mysqld.CurrentPrimaryPosition = replication.Position{ + GTIDSet: replication.Mysql56GTIDSet{}, + } + + // List all expected queries that ExecuteClone will run + mysqld.ExpectedExecuteSuperQueryList = []string{ + fmt.Sprintf("SET GLOBAL clone_valid_donor_list = '%s:%d'", donorHost, donorPort), + fmt.Sprintf("CLONE INSTANCE FROM 'clone_user'@'%s':%d IDENTIFIED BY 'password' REQUIRE NO SSL", donorHost, donorPort), + } + + t.Cleanup(func() { + mysqld.Close() + sqldb.Close() + }) + + return &cloneFromDonorTestEnv{ + ctx: ctx, + logger: logger, + ts: ts, + mysqld: mysqld, + keyspace: keyspace, + shard: shard, + donorHost: donorHost, + donorPort: donorPort, + donorAlias: donorAlias, + } +} + +// mockDonorHandler is used to create a minimal mysqld server that the recipient +// can connect to to verify it's safe to CLONE from. +type mockDonorHandler struct { + mysql.UnimplementedHandler + t *testing.T +} + +func (h *mockDonorHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error { + // Respond to donor validation queries + switch { + case strings.Contains(query, "SELECT @@version"): + result := sqltypes.MakeTestResult( + sqltypes.MakeTestFields("@@version", "varchar"), + "8.0.32", + ) + return callback(result) + case strings.Contains(query, "SELECT PLUGIN_STATUS"): + result := sqltypes.MakeTestResult( + sqltypes.MakeTestFields("PLUGIN_STATUS", "varchar"), + "ACTIVE", + ) + return callback(result) + case strings.Contains(query, "SELECT TABLE_SCHEMA"): + // Return empty result (no non-InnoDB tables) + result := sqltypes.MakeTestResult( + sqltypes.MakeTestFields("TABLE_SCHEMA|TABLE_NAME|ENGINE", "varchar|varchar|varchar"), + ) + return callback(result) + default: + return fmt.Errorf("unexpected query: %s", query) + } +} + +func (h *mockDonorHandler) ComQueryMulti(c *mysql.Conn, sql string, callback func(qr sqltypes.QueryResponse, more bool, firstPacket bool) error) error { + return errors.New("ComQueryMulti not implemented") +} + +func (h *mockDonorHandler) ComPrepare(c *mysql.Conn, query string) ([]*querypb.Field, uint16, error) { + return nil, 0, errors.New("ComPrepare not implemented") +} + +func (h *mockDonorHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error { + return errors.New("ComStmtExecute not implemented") +} + +func (h *mockDonorHandler) ComRegisterReplica(c *mysql.Conn, replicaHost string, replicaPort uint16, replicaUser string, replicaPassword string) error { + return errors.New("ComRegisterReplica not implemented") +} + +func (h *mockDonorHandler) ComBinlogDump(c *mysql.Conn, logFile string, binlogPos uint32) error { + return errors.New("ComBinlogDump not implemented") +} + +func (h *mockDonorHandler) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error { + return errors.New("ComBinlogDumpGTID not implemented") +} + +func (h *mockDonorHandler) WarningCount(c *mysql.Conn) uint16 { + return 0 +} + +func (h *mockDonorHandler) Env() *vtenv.Environment { + return vtenv.NewTestEnv() +} + +func TestCloneFromDonor(t *testing.T) { + // Create mock donor MySQL server once for all test cases + jsonConfig := `{"clone_user": [{"Password": "password"}]}` + authServer := mysql.NewAuthServerStatic("", jsonConfig, 0) + handler := &mockDonorHandler{t: t} + + listener, err := mysql.NewListener("tcp", "127.0.0.1:", authServer, handler, 0, 0, false, false, 0, 0, false) + require.NoError(t, err) + + // Start accepting connections + go listener.Accept() + + // Clean up when all tests complete + t.Cleanup(func() { + listener.Close() + utils.EnsureNoLeaks(t) + }) + + // Get the assigned host/port + donorHost := listener.Addr().(*net.TCPAddr).IP.String() + donorPort := listener.Addr().(*net.TCPAddr).Port + + testCases := []struct { + name string + cloneFromPrimary bool + cloneFromTablet string + setup func(*testing.T, *cloneFromDonorTestEnv) + wantErr bool + wantErrContains string + }{ + { + name: "both cloneFromPrimary and cloneFromTablet specified", + cloneFromPrimary: true, + cloneFromTablet: "cell1-100", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // No setup needed, mutual exclusivity check happens first + }, + wantErr: true, + wantErrContains: "mutually exclusive", + }, + { + name: "clone from primary, get shard fails", + cloneFromPrimary: true, + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // Delete the shard that was created in env setup + require.NoError(t, env.ts.DeleteShard(env.ctx, env.keyspace, env.shard)) + }, + wantErr: true, + wantErrContains: "failed to get shard", + }, + { + name: "clone from primary, shard has no primary", + cloneFromPrimary: true, + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // Clear the primary alias from the shard + _, err := env.ts.UpdateShardFields(env.ctx, env.keyspace, env.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = nil + return nil + }) + require.NoError(t, err) + }, + wantErr: true, + wantErrContains: "has no primary", + }, + { + name: "clone from tablet, invalid tablet alias", + cloneFromTablet: "invalid-alias-format", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // No setup needed, invalid alias will fail parsing + }, + wantErr: true, + wantErrContains: "invalid tablet alias", + }, + { + name: "neither cloneFromPrimary nor cloneFromTablet specified", + cloneFromPrimary: false, + cloneFromTablet: "", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // No setup needed, will fail when no donor is specified + }, + wantErr: true, + wantErrContains: "no donor specified", + }, + { + name: "GetTablet fails", + cloneFromTablet: "cell1-100", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // Delete the tablet that was created in env setup + require.NoError(t, env.ts.DeleteTablet(env.ctx, env.donorAlias)) + }, + wantErr: true, + wantErrContains: "failed to get tablet", + }, + { + name: "clone user not configured", + cloneFromTablet: "cell1-100", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // Clear clone user config + dbconfigs.GlobalDBConfigs.CloneUser = dbconfigs.UserConfig{} + }, + wantErr: true, + wantErrContains: "clone user not configured", + }, + { + name: "get position after clone fails", + cloneFromTablet: "cell1-100", + setup: func(t *testing.T, env *cloneFromDonorTestEnv) { + // Make PrimaryPosition return an error + env.mysqld.PrimaryPositionError = assert.AnError + }, + wantErr: true, + wantErrContains: "failed to get position after clone", + }, + { + name: "success with clone-from-primary", + cloneFromPrimary: true, + wantErr: false, + }, + { + name: "success with clone-from-tablet", + cloneFromTablet: "cell1-100", + wantErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + env := createCloneFromDonorTestEnv(t, donorHost, donorPort) + + // Save and restore global flags and config + oldCloneFromPrimary := cloneFromPrimary + oldCloneFromTablet := cloneFromTablet + oldCloneUser := dbconfigs.GlobalDBConfigs.CloneUser + oldMysqlCloneEnabled := mysqlCloneEnabled + defer func() { + cloneFromPrimary = oldCloneFromPrimary + cloneFromTablet = oldCloneFromTablet + dbconfigs.GlobalDBConfigs.CloneUser = oldCloneUser + mysqlCloneEnabled = oldMysqlCloneEnabled + }() + + // Set test flag values + cloneFromPrimary = tc.cloneFromPrimary + cloneFromTablet = tc.cloneFromTablet + mysqlCloneEnabled = true + + // Run setup if provided + if tc.setup != nil { + tc.setup(t, env) + } + + // Execute CloneFromDonor + pos, err := CloneFromDonor(env.ctx, env.ts, env.mysqld, env.keyspace, env.shard) + + // Verify results + if tc.wantErr { + require.Error(t, err) + if tc.wantErrContains != "" { + assert.ErrorContains(t, err, tc.wantErrContains) + } + } else { + require.NoError(t, err) + assert.NotEmpty(t, pos) + } + }) + } +} diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 8829c3ae578..f279933f78a 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -85,6 +85,9 @@ type FakeMysqlDaemon struct { // and ReplicationStatus. CurrentPrimaryPosition replication.Position + // PrimaryPositionError is used by PrimaryPosition. + PrimaryPositionError error + // CurrentRelayLogPosition is returned by ReplicationStatus. CurrentRelayLogPosition replication.Position @@ -415,6 +418,9 @@ func (fmd *FakeMysqlDaemon) GetPreviousGTIDs(ctx context.Context, binlog string) // PrimaryPosition is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryPosition(ctx context.Context) (replication.Position, error) { + if fmd.PrimaryPositionError != nil { + return replication.Position{}, fmd.PrimaryPositionError + } return fmd.GetPrimaryPositionLocked(), nil }