diff --git a/changelog/24.0/24.0.0/summary.md b/changelog/24.0/24.0.0/summary.md index bf7a89b9959..8c159fb2791 100644 --- a/changelog/24.0/24.0.0/summary.md +++ b/changelog/24.0/24.0.0/summary.md @@ -7,6 +7,8 @@ - **[New Support](#new-support)** - [Window function pushdown for sharded keyspaces](#window-function-pushdown) - **[Minor Changes](#minor-changes)** + - **[VReplication](#minor-changes-vreplication)** + - [`--shards` flag for MoveTables/Reshard start and stop](#vreplication-shards-flag-start-stop) - **[VTGate](#minor-changes-vtgate)** - [New default for `--legacy-replication-lag-algorithm` flag](#vtgate-new-default-legacy-replication-lag-algorithm) - [New "session" mode for `--vtgate-balancer-mode` flag](#vtgate-session-balancer-mode) @@ -36,6 +38,22 @@ For examples and more details, see the [documentation](https://vitess.io/docs/24 ## Minor Changes +### VReplication + +#### `--shards` flag for MoveTables/Reshard start and stop + +The `start` and `stop` commands for MoveTables and Reshard workflows now support the `--shards` flag, allowing users to start or stop workflows on a specific subset of shards rather than all shards at once. + +**Example usage:** + +```bash +# Start workflow on specific shards only +vtctldclient MoveTables --target-keyspace customer --workflow commerce2customer start --shards="-80,80-" + +# Stop workflow on specific shards only +vtctldclient Reshard --target-keyspace customer --workflow cust2cust stop --shards="80-" +``` + ### VTGate #### New default for `--legacy-replication-lag-algorithm` flag diff --git a/go/cmd/vtctldclient/command/vreplication/common/update.go b/go/cmd/vtctldclient/command/vreplication/common/update.go index 896d417d8db..43da34ac907 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/update.go +++ b/go/cmd/vtctldclient/command/vreplication/common/update.go @@ -144,6 +144,7 @@ func commandUpdateState(cmd *cobra.Command, args []string) error { Cells: textutil.SimulatedNullStringSlice, TabletTypes: textutil.SimulatedNullTabletTypeSlice, State: &state, + Shards: StartStopOptions.Shards, }, } diff --git a/go/cmd/vtctldclient/command/vreplication/common/utils.go b/go/cmd/vtctldclient/command/vreplication/common/utils.go index a6ca97a9b93..1c7ce145244 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/utils.go +++ b/go/cmd/vtctldclient/command/vreplication/common/utils.go @@ -332,6 +332,10 @@ func AddCommonCreateFlags(cmd *cobra.Command) { cmd.Flags().StringSliceVar(&CreateOptions.ConfigOverrides, "config-overrides", []string{}, "Specify one or more VReplication config flags to override as a comma-separated list of key=value pairs.") } +var StartStopOptions = struct { + Shards []string +}{} + var MirrorTrafficOptions = struct { DryRun bool Percent float32 diff --git a/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go b/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go index 7a3fa7cafed..0f8c7aa675e 100644 --- a/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go +++ b/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go @@ -78,8 +78,13 @@ func registerCommands(root *cobra.Command) { common.AddShardSubsetFlag(statusCommand, &common.StatusOptions.Shards) base.AddCommand(statusCommand) - base.AddCommand(common.GetStartCommand(opts)) - base.AddCommand(common.GetStopCommand(opts)) + startCommand := common.GetStartCommand(opts) + common.AddShardSubsetFlag(startCommand, &common.StartStopOptions.Shards) + base.AddCommand(startCommand) + + stopCommand := common.GetStopCommand(opts) + common.AddShardSubsetFlag(stopCommand, &common.StartStopOptions.Shards) + base.AddCommand(stopCommand) mirrorTrafficCommand := common.GetMirrorTrafficCommand(opts) mirrorTrafficCommand.Flags().Var((*topoproto.TabletTypeListFlag)(&common.MirrorTrafficOptions.TabletTypes), "tablet-types", "Tablet types to mirror traffic for.") diff --git a/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go b/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go index 84d2eec1383..6fccdff1336 100644 --- a/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go +++ b/go/cmd/vtctldclient/command/vreplication/reshard/reshard.go @@ -45,8 +45,13 @@ func registerReshardCommands(root *cobra.Command) { reshard.AddCommand(common.GetShowCommand(opts)) reshard.AddCommand(common.GetStatusCommand(opts)) - reshard.AddCommand(common.GetStartCommand(opts)) - reshard.AddCommand(common.GetStopCommand(opts)) + startCommand := common.GetStartCommand(opts) + common.AddShardSubsetFlag(startCommand, &common.StartStopOptions.Shards) + reshard.AddCommand(startCommand) + + stopCommand := common.GetStopCommand(opts) + common.AddShardSubsetFlag(stopCommand, &common.StartStopOptions.Shards) + reshard.AddCommand(stopCommand) switchTrafficCommand := common.GetSwitchTrafficCommand(opts) common.AddCommonSwitchTrafficFlags(switchTrafficCommand, false) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index f1d37f84920..2f012db00b8 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -282,6 +282,28 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) (*mt).Start() waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + t.Run("Test --shards flag in MoveTables start/stop", func(t *testing.T) { + // This subtest expects workflow to be running at the start and restarts it at the end. + type tCase struct { + shards string + action string + expected int + } + testCases := []tCase{ + {"-80", "stop", 1}, + {"80-", "stop", 1}, + {"-80,80-", "start", 2}, + } + for _, tc := range testCases { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("MoveTables", "--target-keyspace", targetKeyspace, "--workflow", defaultWorkflowName, tc.action, "--shards", tc.shards) + require.NoError(t, err, "failed to %s workflow on shards %s: %v", tc.action, tc.shards, err) + cnt := gjson.Get(output, "details.#").Int() + require.EqualValuesf(t, tc.expected, cnt, "expected %d shards, got %d for action %s, shards %s", tc.expected, cnt, tc.action, tc.shards) + } + }) + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, tab := range targetTabs { catchup(t, tab, defaultWorkflowName, "MoveTables") } @@ -615,6 +637,27 @@ func splitShard(t *testing.T, keyspace, defaultWorkflowName, sourceShards, targe }) waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, defaultWorkflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + t.Run("Test --shards flag in Reshard start/stop", func(t *testing.T) { + // This subtest expects workflow to be running at the start and restarts it at the end. + type tCase struct { + shards string + action string + expected int + } + testCases := []tCase{ + {"-40", "stop", 1}, + {"40-80", "stop", 1}, + {"-40,40-80", "start", 2}, + } + for _, tc := range testCases { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Reshard", "--target-keyspace", keyspace, "--workflow", defaultWorkflowName, tc.action, "--shards", tc.shards) + require.NoError(t, err, "failed to %s Reshard workflow on shards %s: %v", tc.action, tc.shards, err) + cnt := gjson.Get(output, "details.#").Int() + require.EqualValuesf(t, tc.expected, cnt, "expected %d shards, got %d for action %s, shards %s", tc.expected, cnt, tc.action, tc.shards) + } + }) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, defaultWorkflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, targetTab := range targetTabs { catchup(t, targetTab, defaultWorkflowName, "Reshard") }