diff --git a/changelog/24.0/24.0.0/summary.md b/changelog/24.0/24.0.0/summary.md index bf7a89b9959..dbfbdcee695 100644 --- a/changelog/24.0/24.0.0/summary.md +++ b/changelog/24.0/24.0.0/summary.md @@ -6,6 +6,7 @@ - **[Major Changes](#major-changes)** - **[New Support](#new-support)** - [Window function pushdown for sharded keyspaces](#window-function-pushdown) + - [Tablet targeting via USE statement](#tablet-targeting) - **[Minor Changes](#minor-changes)** - **[VTGate](#minor-changes-vtgate)** - [New default for `--legacy-replication-lag-algorithm` flag](#vtgate-new-default-legacy-replication-lag-algorithm) @@ -34,6 +35,24 @@ Previously, all window function queries required single-shard routing, which lim For examples and more details, see the [documentation](https://vitess.io/docs/24.0/reference/compatibility/mysql-compatibility/#window-functions). +#### Tablet targeting via USE statement + +VTGate now supports routing queries to a specific tablet by alias using an extended `USE` statement syntax: + +```sql +USE keyspace:shard@tablet_type|tablet_alias; +``` + +For example, to target a specific replica tablet: + +```sql +USE commerce:-80@replica|zone1-0000000100; +``` + +Once set, all subsequent queries in the session route to the specified tablet until cleared with a standard `USE keyspace` or `USE keyspace@tablet_type` statement. This is useful for debugging, per-tablet monitoring, cache warming, and other operational tasks where targeting a specific tablet is required. + +Note: A shard must be specified when using tablet targeting. Like shard targeting, this bypasses vindex-based routing, so use with care. + ## Minor Changes ### VTGate diff --git a/go/test/endtoend/vtgate/main_test.go b/go/test/endtoend/vtgate/main_test.go index 3acd8481c88..7cff47a1499 100644 --- a/go/test/endtoend/vtgate/main_test.go +++ b/go/test/endtoend/vtgate/main_test.go @@ -72,7 +72,7 @@ func TestMain(m *testing.M) { clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-config-max-result-size", "100", "--queryserver-config-terse-errors") - err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false, clusterInstance.Cell) + err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 2, false, clusterInstance.Cell) if err != nil { return 1 } diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index af70ace03af..01dc2c5ab46 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -17,6 +17,7 @@ limitations under the License. package vtgate import ( + "context" "fmt" "sync/atomic" "testing" @@ -839,6 +840,169 @@ func TestDDLTargeted(t *testing.T) { utils.AssertMatches(t, conn, `select id from ddl_targeted`, `[[INT64(1)]]`) } +// TestTabletTargeting tests tablet-specific routing with USE keyspace:shard@tablet-alias syntax. +// When shard is specified, tablet-specific routing bypasses vindex-based shard resolution. +func TestTabletTargeting(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + instances := make(map[string]map[string][]string) + + for _, ks := range clusterInstance.Keyspaces { + if ks.Name != "ks" { + continue + } + for _, shard := range ks.Shards { + instances[shard.Name] = make(map[string][]string) + for _, tablet := range shard.Vttablets { + instances[shard.Name][tablet.Type] = append(instances[shard.Name][tablet.Type], tablet.Alias) + } + } + } + + require.NotEmpty(t, instances["-80"]["primary"][0], "no PRIMARY tablet found for -80 shard") + require.NotEmpty(t, instances["80-"]["primary"][0], "no PRIMARY tablet found for 80- shard") + require.NotEmpty(t, instances["-80"]["replica"], "no REPLICA tablets found for -80 shard") + require.NotEmpty(t, instances["80-"]["replica"], "no REPLICA tablets found for 80- shard") + + // Insert data that would normally hash to 80- shard, but goes to -80 because of shard targeting + useStmt := fmt.Sprintf("USE `ks:-80@primary|%s`", instances["-80"]["primary"][0]) + utils.Exec(t, conn, useStmt) + utils.Exec(t, conn, "INSERT into t1(id1, id2) values(1, 100), (2, 200)") + // id1=4 hashes to 80-, but we're targeting -80 shard explicitly + utils.Exec(t, conn, "insert into t1(id1, id2) values(4, 400)") + + // Verify the data went to -80 shard (not where vindex would have put it) + utils.Exec(t, conn, "USE `ks:-80`") + utils.AssertMatches(t, conn, "select id1 from t1 where id1 in (1, 2, 4) order by id1", "[[INT64(1)] [INT64(2)] [INT64(4)]]") + + // Verify the data did NOT go to 80- shard (where vindex says id1=4 should be) + utils.Exec(t, conn, "USE `ks:80-`") + utils.AssertIsEmpty(t, conn, "select id1 from t1 where id1=4") + + // Transaction with tablet-specific routing maintains sticky connection + useStmt = fmt.Sprintf("USE `ks:-80@primary|%s`", instances["-80"]["primary"][0]) + utils.Exec(t, conn, useStmt) + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into t1(id1, id2) values(10, 300)") + // Subsequent queries in transaction should go to same tablet + utils.AssertMatches(t, conn, "select id1 from t1 where id1=10", "[[INT64(10)]]") + utils.Exec(t, conn, "commit") + utils.AssertMatches(t, conn, "select id1 from t1 where id1=10", "[[INT64(10)]]") + + // Rollback with tablet-specific routing + useStmt = fmt.Sprintf("USE `ks:-80@primary|%s`", instances["-80"]["primary"][0]) + utils.Exec(t, conn, useStmt) + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into t1(id1, id2) values(20, 500)") + utils.Exec(t, conn, "rollback") + utils.AssertIsEmpty(t, conn, "select id1 from t1 where id1=20") + + // Invalid tablet alias should fail + useStmt = "USE `ks:-80@primary|nonexistent-tablet`" + _, err = conn.ExecuteFetch(useStmt, 1, false) + require.Error(t, err, "query should fail on invalid tablet") + require.Contains(t, err.Error(), "invalid tablet alias in target") + + // Tablet alias without shard should fail + useStmt = fmt.Sprintf("USE `ks@primary|%s`", instances["-80"]["primary"][0]) + _, err = conn.ExecuteFetch(useStmt, 1, false) + require.Error(t, err, "tablet alias must be used with a shard") + + // Clear tablet targeting returns to normal routing + // With normal routing, the query for id1=4 will be sent to the wrong shard (80-), so it won't be found. + // This is expected and demonstrates that vindex routing is back in effect. + utils.Exec(t, conn, "USE ks") + utils.AssertMatches(t, conn, "select id1 from t1 where id1 in (1, 2, 4, 10) order by id1", "[[INT64(1)] [INT64(2)] [INT64(10)]]") + + // Targeting a specific REPLICA tablet allows reads but not writes + replicaAlias := instances["-80"]["replica"][0] + useStmt = fmt.Sprintf("USE `ks:-80@replica|%s`", replicaAlias) + utils.Exec(t, conn, useStmt) + + // Reads should work on replica (wait for replication) + require.Eventually(t, func() bool { + result, err := conn.ExecuteFetch("select id1 from t1 where id1 in (1, 2, 4, 10) order by id1", 10, false) + return err == nil && len(result.Rows) == 4 + }, 15*time.Second, 100*time.Millisecond, "replication did not catch up for first replica read") + + // Writes should fail on replica (replicas are read-only) + _, err = conn.ExecuteFetch("insert into t1(id1, id2) values(99, 999)", 1, false) + require.Error(t, err, "write should fail on replica tablet") + require.Contains(t, err.Error(), "1290") + + // Targeting different REPLICA tablets in the same shard + secondReplicaAlias := instances["-80"]["replica"][1] + useStmt = fmt.Sprintf("USE `ks:-80@replica|%s`", secondReplicaAlias) + utils.Exec(t, conn, useStmt) + + // Should still be able to read from this different replica (wait for replication) + require.Eventually(t, func() bool { + result, err := conn.ExecuteFetch("select id1 from t1 where id1 in (1, 2, 4, 10) order by id1", 10, false) + return err == nil && len(result.Rows) == 4 + }, 15*time.Second, 100*time.Millisecond, "replication did not catch up for second replica read") + + // Writes should still fail + _, err = conn.ExecuteFetch("insert into t1(id1, id2) values(98, 998)", 1, false) + require.Error(t, err, "write should fail on replica tablet") + require.Contains(t, err.Error(), "1290") + + // Write to primary, verify it replicates to replica + // This tests that tablet-specific routing doesn't break replication + useStmt = fmt.Sprintf("USE `ks:-80@primary|%s`", instances["-80"]["primary"][0]) + utils.Exec(t, conn, useStmt) + utils.Exec(t, conn, "insert into t1(id1, id2) values(50, 5000)") + utils.AssertMatches(t, conn, "select id1 from t1 where id1=50", "[[INT64(50)]]") + + // Switch to replica and verify the data replicated + replicaAlias = instances["-80"]["replica"][0] + useStmt = fmt.Sprintf("USE `ks:-80@replica|%s`", replicaAlias) + utils.Exec(t, conn, useStmt) + // Wait for replication to catch up + require.Eventually(t, func() bool { + result, err := conn.ExecuteFetch("select id1 from t1 where id1=50", 1, false) + return err == nil && len(result.Rows) == 1 + }, 15*time.Second, 100*time.Millisecond, "replication did not catch up") + + // Query different replicas and verify different server UUIDs + // This proves we're actually hitting different physical tablets + + // Get server UUID from first replica + useStmt = fmt.Sprintf("USE `ks:-80@replica|%s`", instances["-80"]["replica"][0]) + utils.Exec(t, conn, useStmt) + var uuid1 string + for i := range 5 { + result1 := utils.Exec(t, conn, "SELECT @@server_uuid") + require.NotNil(t, result1) + require.Greater(t, len(result1.Rows), 0) + if i > 0 { + // UUID should be the same across multiple queries to same tablet + require.Equal(t, uuid1, result1.Rows[0][0].ToString()) + } + uuid1 = result1.Rows[0][0].ToString() + } + + // Get server UUID from second replica + useStmt = fmt.Sprintf("USE `ks:-80@replica|%s`", instances["-80"]["replica"][1]) + utils.Exec(t, conn, useStmt) + var uuid2 string + for i := range 5 { + result2 := utils.Exec(t, conn, "SELECT @@server_uuid") + require.NotNil(t, result2) + require.Greater(t, len(result2.Rows), 0) + if i > 0 { + // UUID should be the same across multiple queries to same tablet + require.Equal(t, uuid2, result2.Rows[0][0].ToString()) + } + uuid2 = result2.Rows[0][0].ToString() + } + + // Server UUIDs should be different, proving we're targeting different tablets + require.NotEqual(t, uuid1, uuid2, "different replicas should have different server UUIDs") +} + // TestDynamicConfig tests the dynamic configurations. func TestDynamicConfig(t *testing.T) { t.Run("DiscoveryLowReplicationLag", func(t *testing.T) { diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 4a526b434a6..df8bb0b87b4 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -236,7 +236,7 @@ func (vw *VSchemaWrapper) ShardDestination() key.ShardDestination { } func (vw *VSchemaWrapper) FindTable(tab sqlparser.TableName) (*vindexes.BaseTable, string, topodatapb.TabletType, key.ShardDestination, error) { - destKeyspace, destTabletType, destTarget, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) + destKeyspace, destTabletType, destTarget, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) if err != nil { return nil, destKeyspace, destTabletType, destTarget, err } @@ -248,7 +248,7 @@ func (vw *VSchemaWrapper) FindTable(tab sqlparser.TableName) (*vindexes.BaseTabl } func (vw *VSchemaWrapper) FindView(tab sqlparser.TableName) sqlparser.TableStatement { - destKeyspace, _, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) + destKeyspace, _, _, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) if err != nil { return nil } @@ -256,7 +256,7 @@ func (vw *VSchemaWrapper) FindView(tab sqlparser.TableName) sqlparser.TableState } func (vw *VSchemaWrapper) FindViewTarget(name sqlparser.TableName) (*vindexes.Keyspace, error) { - destKeyspace, _, _, err := topoproto.ParseDestination(name.Qualifier.String(), topodatapb.TabletType_PRIMARY) + destKeyspace, _, _, _, err := topoproto.ParseDestination(name.Qualifier.String(), topodatapb.TabletType_PRIMARY) if err != nil { return nil, err } @@ -336,7 +336,7 @@ func (vw *VSchemaWrapper) IsViewsEnabled() bool { // FindMirrorRule finds the mirror rule for the requested keyspace, table // name, and the tablet type in the VSchema. func (vw *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) { - destKeyspace, destTabletType, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) + destKeyspace, destTabletType, _, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) if err != nil { return nil, err } diff --git a/go/vt/topo/topoproto/destination.go b/go/vt/topo/topoproto/destination.go index 19a4965e62b..1aa00d36404 100644 --- a/go/vt/topo/topoproto/destination.go +++ b/go/vt/topo/topoproto/destination.go @@ -29,16 +29,44 @@ import ( // ParseDestination parses the string representation of a ShardDestination // of the form keyspace:shard@tablet_type. You can use a / instead of a :. -func ParseDestination(targetString string, defaultTabletType topodatapb.TabletType) (string, topodatapb.TabletType, key.ShardDestination, error) { +// It also supports tablet-specific routing with keyspace:shard@tablet_type|tablet-alias +// where tablet-alias is in the format cell-uid (e.g., zone1-0000000100). +// The tablet_type|tablet-alias syntax explicitly specifies both the expected tablet type +// and the specific tablet to route to (e.g., @replica|zone1-0000000100). +func ParseDestination(targetString string, defaultTabletType topodatapb.TabletType) (string, topodatapb.TabletType, key.ShardDestination, *topodatapb.TabletAlias, error) { var dest key.ShardDestination var keyspace string + var tabletAlias *topodatapb.TabletAlias tabletType := defaultTabletType last := strings.LastIndexAny(targetString, "@") if last != -1 { - // No need to check the error. UNKNOWN will be returned on - // error and it will fail downstream. - tabletType, _ = ParseTabletType(targetString[last+1:]) + afterAt := targetString[last+1:] + if afterAt == "" { + return "", defaultTabletType, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "empty tablet type after @") + } + // Check for explicit tablet_type|tablet_alias syntax (e.g., "replica|zone1-0000000100") + if pipeIdx := strings.Index(afterAt, "|"); pipeIdx != -1 { + typeStr := afterAt[:pipeIdx] + aliasStr := afterAt[pipeIdx+1:] + + // Parse the tablet type + parsedTabletType, err := ParseTabletType(typeStr) + if err != nil || parsedTabletType == topodatapb.TabletType_UNKNOWN { + return "", defaultTabletType, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet type in target: %s", typeStr) + } + tabletType = parsedTabletType + + // Parse the tablet alias + alias, err := ParseTabletAlias(aliasStr) + if err != nil { + return "", defaultTabletType, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet alias in target: %s", aliasStr) + } + tabletAlias = alias + } else { + // No pipe: just a tablet type (backward compatible - allow UNKNOWN) + tabletType, _ = ParseTabletType(afterAt) + } targetString = targetString[:last] } last = strings.LastIndexAny(targetString, "/:") @@ -51,29 +79,32 @@ func ParseDestination(targetString string, defaultTabletType topodatapb.TabletTy if last != -1 { rangeEnd := strings.LastIndexAny(targetString, "]") if rangeEnd == -1 { - return keyspace, tabletType, dest, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid key range provided. Couldn't find range end ']'") + return keyspace, tabletType, dest, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid key range provided. Couldn't find range end ']'") } rangeString := targetString[last+1 : rangeEnd] if strings.Contains(rangeString, "-") { // Parse as range keyRange, err := key.ParseShardingSpec(rangeString) if err != nil { - return keyspace, tabletType, dest, err + return keyspace, tabletType, dest, nil, err } if len(keyRange) != 1 { - return keyspace, tabletType, dest, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "single keyrange expected in %s", rangeString) + return keyspace, tabletType, dest, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "single keyrange expected in %s", rangeString) } dest = key.DestinationExactKeyRange{KeyRange: keyRange[0]} } else { // Parse as keyspace id destBytes, err := hex.DecodeString(rangeString) if err != nil { - return keyspace, tabletType, dest, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "expected valid hex in keyspace id %s", rangeString) + return keyspace, tabletType, dest, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "expected valid hex in keyspace id %s", rangeString) } dest = key.DestinationKeyspaceID(destBytes) } targetString = targetString[:last] } keyspace = targetString - return keyspace, tabletType, dest, nil + if tabletAlias != nil && dest == nil { + return "", defaultTabletType, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet alias must be used with a shard") + } + return keyspace, tabletType, dest, tabletAlias, nil } diff --git a/go/vt/topo/topoproto/destination_test.go b/go/vt/topo/topoproto/destination_test.go index 553f749021b..22737877f53 100644 --- a/go/vt/topo/topoproto/destination_test.go +++ b/go/vt/topo/topoproto/destination_test.go @@ -21,6 +21,8 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/key" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -35,6 +37,7 @@ func TestParseDestination(t *testing.T) { dest key.ShardDestination keyspace string tabletType topodatapb.TabletType + tabletAlias *topodatapb.TabletAlias }{{ targetString: "ks[10-20]@primary", keyspace: "ks", @@ -87,37 +90,75 @@ func TestParseDestination(t *testing.T) { keyspace: "ks", dest: key.DestinationShard("-80"), tabletType: topodatapb.TabletType_PRIMARY, + }, { + targetString: "ks:-80@primary|zone1-0000000100", + keyspace: "ks", + dest: key.DestinationShard("-80"), + tabletType: topodatapb.TabletType_PRIMARY, + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 100}, + }, { + targetString: "ks:-80@replica|zone1-0000000101", + keyspace: "ks", + dest: key.DestinationShard("-80"), + tabletType: topodatapb.TabletType_REPLICA, + tabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 101}, + }, { + targetString: "ks:80-@rdonly|zone2-0000000200", + keyspace: "ks", + dest: key.DestinationShard("80-"), + tabletType: topodatapb.TabletType_RDONLY, + tabletAlias: &topodatapb.TabletAlias{Cell: "zone2", Uid: 200}, }} for _, tcase := range testcases { - if targetKeyspace, targetTabletType, targetDest, _ := ParseDestination(tcase.targetString, topodatapb.TabletType_PRIMARY); !reflect.DeepEqual(targetDest, tcase.dest) || targetKeyspace != tcase.keyspace || targetTabletType != tcase.tabletType { - t.Errorf("ParseDestination(%s) - got: (%v, %v, %v), want (%v, %v, %v)", + if targetKeyspace, targetTabletType, targetDest, targetAlias, _ := ParseDestination(tcase.targetString, topodatapb.TabletType_PRIMARY); !reflect.DeepEqual(targetDest, tcase.dest) || targetKeyspace != tcase.keyspace || targetTabletType != tcase.tabletType || !reflect.DeepEqual(targetAlias, tcase.tabletAlias) { + t.Errorf("ParseDestination(%s) - got: (%v, %v, %v, %v), want (%v, %v, %v, %v)", tcase.targetString, - targetDest, targetKeyspace, targetTabletType, - tcase.dest, + targetDest, + targetAlias, tcase.keyspace, tcase.tabletType, + tcase.dest, + tcase.tabletAlias, ) } } - _, _, _, err := ParseDestination("ks[20-40-60]", topodatapb.TabletType_PRIMARY) - want := "single keyrange expected in 20-40-60" - if err == nil || err.Error() != want { - t.Errorf("executorExec error: %v, want %s", err, want) - } + _, _, _, _, err := ParseDestination("ks[20-40-60]", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "single keyrange expected in 20-40-60") - _, _, _, err = ParseDestination("ks[--60]", topodatapb.TabletType_PRIMARY) - want = "malformed spec: MinKey/MaxKey cannot be in the middle of the spec: \"--60\"" - if err == nil || err.Error() != want { - t.Errorf("executorExec error: %v, want %s", err, want) - } + _, _, _, _, err = ParseDestination("ks[--60]", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "malformed spec: MinKey/MaxKey cannot be in the middle of the spec: \"--60\"") - _, _, _, err = ParseDestination("ks[qrnqorrs]@primary", topodatapb.TabletType_PRIMARY) - want = "expected valid hex in keyspace id qrnqorrs" - if err == nil || err.Error() != want { - t.Errorf("executorExec error: %v, want %s", err, want) - } + _, _, _, _, err = ParseDestination("ks[qrnqorrs]@primary", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "expected valid hex in keyspace id qrnqorrs") + + _, _, _, _, err = ParseDestination("ks@primary|zone1-0000000100", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "tablet alias must be used with a shard") + + // Test invalid tablet type in pipe syntax + _, _, _, _, err = ParseDestination("ks:-80@invalid|zone1-0000000100", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "invalid tablet type in target: invalid") + + // Test invalid tablet alias in pipe syntax + _, _, _, _, err = ParseDestination("ks:-80@primary|invalid-alias", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "invalid tablet alias in target: invalid-alias") + + // Test unknown tablet type in pipe syntax + _, _, _, _, err = ParseDestination("ks:-80@unknown|zone1-0000000100", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "invalid tablet type in target: unknown") + + // Test empty tablet type in pipe syntax + _, _, _, _, err = ParseDestination("ks:-80@|zone1-0000000100", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "invalid tablet type in target: ") + + // Test empty tablet alias in pipe syntax + _, _, _, _, err = ParseDestination("ks:-80@primary|", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "invalid tablet alias in target: ") + + // Test empty string after @ + _, _, _, _, err = ParseDestination("ks@", topodatapb.TabletType_PRIMARY) + require.EqualError(t, err, "empty tablet type after @") } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 3e5841760bf..505966e000c 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1116,7 +1116,7 @@ func (e *Executor) SaveVSchema(vschema *vindexes.VSchema, stats *VSchemaStats) { } // ParseDestinationTarget parses destination target string and sets default keyspace if possible. -func (e *Executor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.ShardDestination, error) { +func (e *Executor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.ShardDestination, *topodatapb.TabletAlias, error) { return econtext.ParseDestinationTarget(targetString, defaultTabletType, e.VSchema()) } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 0c7c1e46b79..a4d3a97b226 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1923,7 +1923,7 @@ func TestParseEmptyTargetSingleKeyspace(t *testing.T) { } r.vschema = altVSchema - destKeyspace, destTabletType, _, _ := r.ParseDestinationTarget("") + destKeyspace, destTabletType, _, _, _ := r.ParseDestinationTarget("") if destKeyspace != KsTestUnsharded || destTabletType != topodatapb.TabletType_PRIMARY { t.Errorf( "parseDestinationTarget(%s): got (%v, %v), want (%v, %v)", @@ -1947,7 +1947,7 @@ func TestParseEmptyTargetMultiKeyspace(t *testing.T) { } r.vschema = altVSchema - destKeyspace, destTabletType, _, _ := r.ParseDestinationTarget("") + destKeyspace, destTabletType, _, _, _ := r.ParseDestinationTarget("") if destKeyspace != "" || destTabletType != topodatapb.TabletType_PRIMARY { t.Errorf( "parseDestinationTarget(%s): got (%v, %v), want (%v, %v)", @@ -1970,7 +1970,7 @@ func TestParseTargetSingleKeyspace(t *testing.T) { } r.vschema = altVSchema - destKeyspace, destTabletType, _, _ := r.ParseDestinationTarget("@replica") + destKeyspace, destTabletType, _, _, _ := r.ParseDestinationTarget("@replica") if destKeyspace != KsTestUnsharded || destTabletType != topodatapb.TabletType_REPLICA { t.Errorf( "parseDestinationTarget(%s): got (%v, %v), want (%v, %v)", diff --git a/go/vt/vtgate/executorcontext/safe_session.go b/go/vt/vtgate/executorcontext/safe_session.go index 2f04e07fe01..954c453817d 100644 --- a/go/vt/vtgate/executorcontext/safe_session.go +++ b/go/vt/vtgate/executorcontext/safe_session.go @@ -65,6 +65,11 @@ type ( logging *ExecuteLogger + // targetTabletAlias is set when using tablet-specific routing via USE keyspace:shard@tablet_type|tablet-alias. + // This causes all queries to route to the specified tablet until cleared. + // Note: This is stored in the Go wrapper, not in the protobuf Session. + targetTabletAlias *topodatapb.TabletAlias + *vtgatepb.Session } @@ -1143,3 +1148,19 @@ func (l *ExecuteLogger) GetLogs() []engine.ExecuteEntry { copy(result, l.entries) return result } + +// SetTargetTabletAlias sets the tablet alias for tablet-specific routing. +// When set, all queries will route to the specified tablet until cleared. +func (session *SafeSession) SetTargetTabletAlias(alias *topodatapb.TabletAlias) { + session.mu.Lock() + defer session.mu.Unlock() + session.targetTabletAlias = alias +} + +// GetTargetTabletAlias returns the current tablet alias for tablet-specific routing, +// or nil if not set. +func (session *SafeSession) GetTargetTabletAlias() *topodatapb.TabletAlias { + session.mu.Lock() + defer session.mu.Unlock() + return session.targetTabletAlias +} diff --git a/go/vt/vtgate/executorcontext/safe_session_test.go b/go/vt/vtgate/executorcontext/safe_session_test.go index 2226575e31b..c3144344586 100644 --- a/go/vt/vtgate/executorcontext/safe_session_test.go +++ b/go/vt/vtgate/executorcontext/safe_session_test.go @@ -205,3 +205,21 @@ func TestTimeZone(t *testing.T) { }) } } + +// TestTargetTabletAlias tests the SetTargetTabletAlias and GetTargetTabletAlias methods. +func TestTargetTabletAlias(t *testing.T) { + session := NewSafeSession(&vtgatepb.Session{}) + + // Test: initially nil + assert.Nil(t, session.GetTargetTabletAlias()) + + // Test: Set and get + alias := &topodatapb.TabletAlias{Cell: "zone1", Uid: 100} + session.SetTargetTabletAlias(alias) + got := session.GetTargetTabletAlias() + assert.Equal(t, alias, got) + + // Test: Clear (set to nil) + session.SetTargetTabletAlias(nil) + assert.Nil(t, session.GetTargetTabletAlias()) +} diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index fc98e1f37c0..8705eb7ffd2 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -208,11 +208,15 @@ func NewVCursorImpl( cfg VCursorConfig, metrics Metrics, ) (*VCursorImpl, error) { - keyspace, tabletType, destination, err := ParseDestinationTarget(safeSession.TargetString, cfg.DefaultTabletType, vschema) + keyspace, tabletType, destination, tabletAlias, err := ParseDestinationTarget(safeSession.TargetString, cfg.DefaultTabletType, vschema) if err != nil { return nil, err } + // Store tablet alias from target string into session + // This ensures tablet-specific routing persists across queries + safeSession.SetTargetTabletAlias(tabletAlias) + var ts *topo.Server // We don't have access to the underlying TopoServer if this vtgate is // filtering keyspaces because we don't have an accurate view of the topo. @@ -533,19 +537,20 @@ func (vc *VCursorImpl) FindViewTarget(name sqlparser.TableName) (*vindexes.Keysp } func (vc *VCursorImpl) parseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.ShardDestination, error) { - return ParseDestinationTarget(targetString, vc.tabletType, vc.vschema) + keyspace, tabletType, dest, _, err := ParseDestinationTarget(targetString, vc.tabletType, vc.vschema) + return keyspace, tabletType, dest, err } // ParseDestinationTarget parses destination target string and provides a keyspace if possible. -func ParseDestinationTarget(targetString string, tablet topodatapb.TabletType, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.ShardDestination, error) { - destKeyspace, destTabletType, dest, err := topoprotopb.ParseDestination(targetString, tablet) +func ParseDestinationTarget(targetString string, tablet topodatapb.TabletType, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.ShardDestination, *topodatapb.TabletAlias, error) { + destKeyspace, destTabletType, dest, tabletAlias, err := topoprotopb.ParseDestination(targetString, tablet) // If the keyspace is not specified, and there is only one keyspace in the VSchema, use that. if destKeyspace == "" && len(vschema.Keyspaces) == 1 { for k := range vschema.Keyspaces { destKeyspace = k } } - return destKeyspace, destTabletType, dest, err + return destKeyspace, destTabletType, dest, tabletAlias, err } func (vc *VCursorImpl) getDualTable() (*vindexes.BaseTable, vindexes.Vindex, string, topodatapb.TabletType, key.ShardDestination, error) { @@ -1027,10 +1032,17 @@ func (vc *VCursorImpl) Session() engine.SessionActions { } func (vc *VCursorImpl) SetTarget(target string) error { - keyspace, tabletType, _, err := topoprotopb.ParseDestination(target, vc.config.DefaultTabletType) + keyspace, tabletType, destination, tabletAlias, err := topoprotopb.ParseDestination(target, vc.config.DefaultTabletType) if err != nil { return err } + + // Tablet targeting must be set before starting a transaction, not during. + if tabletAlias != nil && vc.SafeSession.InTransaction() { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "cannot set tablet target while in a transaction") + } + if _, ok := vc.vschema.Keyspaces[keyspace]; !ignoreKeyspace(keyspace) && !ok { return vterrors.VT05003(keyspace) } @@ -1040,6 +1052,7 @@ func (vc *VCursorImpl) SetTarget(target string) error { } vc.SafeSession.SetTargetString(target) vc.keyspace = keyspace + vc.destination = destination vc.tabletType = tabletType return nil } diff --git a/go/vt/vtgate/executorcontext/vcursor_impl_test.go b/go/vt/vtgate/executorcontext/vcursor_impl_test.go index ed7654a58d4..a5eb7d4c394 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl_test.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl_test.go @@ -393,7 +393,7 @@ func (f fakeExecutor) SetVitessMetadata(ctx context.Context, name, value string) panic("implement me") } -func (f fakeExecutor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.ShardDestination, error) { +func (f fakeExecutor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.ShardDestination, *topodatapb.TabletAlias, error) { // TODO implement me panic("implement me") } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 344b2af3e2b..e9008ef5be4 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -859,6 +859,13 @@ func requireNewQS(err error, target *querypb.Target) bool { // actionInfo looks at the current session, and returns information about what needs to be done for this tablet func actionInfo(ctx context.Context, target *querypb.Target, session *econtext.SafeSession, autocommit bool, txMode vtgatepb.TransactionMode) (*shardActionInfo, *vtgatepb.Session_ShardSession, error) { if !session.InTransaction() && !session.InReservedConn() { + // Check for tablet-specific routing for non-transactional queries + if alias := session.GetTargetTabletAlias(); alias != nil { + return &shardActionInfo{ + actionNeeded: nothing, + alias: alias, + }, nil, nil + } return &shardActionInfo{}, nil, nil } ignoreSession := ctx.Value(engine.IgnoreReserveTxn) @@ -896,6 +903,16 @@ func actionInfo(ctx context.Context, target *querypb.Target, session *econtext.S info.alias = shardSession.TabletAlias info.rowsAffected = shardSession.RowsAffected } + // Set tablet alias for routing if tablet-specific targeting is active + if targetAlias := session.GetTargetTabletAlias(); targetAlias != nil { + if info.alias == nil { + info.alias = targetAlias + } else if !proto.Equal(info.alias, targetAlias) { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "cannot change tablet target mid-transaction: session has %s, target is %s", + topoproto.TabletAliasString(info.alias), topoproto.TabletAliasString(targetAlias)) + } + } return info, shardSession, nil } diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index c09aa84b09d..d9488f4a34f 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -632,3 +632,63 @@ func TestIsConnClosed(t *testing.T) { }) } } + +// TestActionInfoWithTabletAlias tests the actionInfo function with tablet-specific routing. +func TestActionInfoWithTabletAlias(t *testing.T) { + ctx := utils.LeakCheckContext(t) + target := &querypb.Target{ + Keyspace: "ks", + Shard: "-80", + TabletType: topodatapb.TabletType_PRIMARY, + } + tabletAlias := &topodatapb.TabletAlias{Cell: "zone1", Uid: 100} + + t.Run("non-transactional with tablet alias", func(t *testing.T) { + session := econtext.NewSafeSession(&vtgatepb.Session{}) + session.SetTargetTabletAlias(tabletAlias) + + info, shardSession, err := actionInfo(ctx, target, session, false, vtgatepb.TransactionMode_MULTI) + require.NoError(t, err) + assert.Nil(t, shardSession) + assert.Equal(t, nothing, info.actionNeeded) + assert.Equal(t, tabletAlias, info.alias) + }) + + t.Run("transaction begin with tablet alias", func(t *testing.T) { + session := econtext.NewSafeSession(&vtgatepb.Session{ + InTransaction: true, + }) + session.SetTargetTabletAlias(tabletAlias) + + info, shardSession, err := actionInfo(ctx, target, session, false, vtgatepb.TransactionMode_MULTI) + require.NoError(t, err) + assert.Nil(t, shardSession) + assert.Equal(t, begin, info.actionNeeded) + assert.Equal(t, tabletAlias, info.alias) + }) + + t.Run("existing transaction with different tablet alias errors", func(t *testing.T) { + session := econtext.NewSafeSession(&vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{{ + Target: target, + TransactionId: 12345, + TabletAlias: &topodatapb.TabletAlias{Cell: "zone1", Uid: 50}, + }}, + }) + session.SetTargetTabletAlias(tabletAlias) // zone1-100, different from zone1-50 + + _, _, err := actionInfo(ctx, target, session, false, vtgatepb.TransactionMode_MULTI) + require.ErrorContains(t, err, "cannot change tablet target mid-transaction") + }) + + t.Run("no tablet alias - existing behavior", func(t *testing.T) { + session := econtext.NewSafeSession(&vtgatepb.Session{}) + + info, shardSession, err := actionInfo(ctx, target, session, false, vtgatepb.TransactionMode_MULTI) + require.NoError(t, err) + assert.Nil(t, shardSession) + assert.Equal(t, nothing, info.actionNeeded) + assert.Nil(t, info.alias) + }) +} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 3fe35044b31..3a3179588a9 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -558,7 +558,7 @@ func (vtg *VTGate) Execute( prepared bool, ) (newSession *vtgatepb.Session, qr *sqltypes.Result, err error) { // In this context, we don't care if we can't fully parse destination - destKeyspace, destTabletType, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) + destKeyspace, destTabletType, _, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) statsKey := []string{"Execute", destKeyspace, topoproto.TabletTypeLString(destTabletType)} defer vtg.timings.Record(statsKey, time.Now()) @@ -620,7 +620,7 @@ func (vtg *VTGate) ExecuteMulti( // ExecuteBatch executes a batch of queries. func (vtg *VTGate) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) { // In this context, we don't care if we can't fully parse destination - destKeyspace, destTabletType, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) + destKeyspace, destTabletType, _, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) statsKey := []string{"ExecuteBatch", destKeyspace, topoproto.TabletTypeLString(destTabletType)} defer vtg.timings.Record(statsKey, time.Now()) @@ -649,7 +649,7 @@ func (vtg *VTGate) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, // Note we guarantee the callback will not be called concurrently by multiple go routines. func (vtg *VTGate) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { // In this context, we don't care if we can't fully parse destination - destKeyspace, destTabletType, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) + destKeyspace, destTabletType, _, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) statsKey := []string{"StreamExecute", destKeyspace, topoproto.TabletTypeLString(destTabletType)} defer vtg.timings.Record(statsKey, time.Now()) @@ -735,7 +735,7 @@ func (vtg *VTGate) CloseSession(ctx context.Context, session *vtgatepb.Session) // Prepare supports non-streaming prepare statement query with multi shards func (vtg *VTGate) Prepare(ctx context.Context, session *vtgatepb.Session, sql string) (newSession *vtgatepb.Session, fld []*querypb.Field, paramsCount uint16, err error) { // In this context, we don't care if we can't fully parse destination - destKeyspace, destTabletType, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) + destKeyspace, destTabletType, _, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) statsKey := []string{"Prepare", destKeyspace, topoproto.TabletTypeLString(destTabletType)} defer vtg.timings.Record(statsKey, time.Now()) diff --git a/go/vtbench/client.go b/go/vtbench/client.go index 727453f555d..2dd20ca732c 100644 --- a/go/vtbench/client.go +++ b/go/vtbench/client.go @@ -124,7 +124,7 @@ func (c *grpcVttabletConn) connect(ctx context.Context, cp ConnParams) error { }) // parse the "db" into the keyspace/shard target - keyspace, tabletType, dest, err := topoproto.ParseDestination(cp.DB, topodatapb.TabletType_PRIMARY) + keyspace, tabletType, dest, _, err := topoproto.ParseDestination(cp.DB, topodatapb.TabletType_PRIMARY) if err != nil { return err }