From 2227a5be83d615df3feceff67a49a1aa920b37d5 Mon Sep 17 00:00:00 2001 From: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:43:34 -0500 Subject: [PATCH 1/3] Skip FOR...UPDATE Queries from Warming Reads Signed-off-by: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> --- go/vt/vtgate/engine/route.go | 5 + .../vtgate/engine/route_warming_reads_test.go | 127 ++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 go/vt/vtgate/engine/route_warming_reads_test.go diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index fd206590d9a..3216ec0734f 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -508,6 +508,11 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs return } + // Skip SELECT ... FOR UPDATE queries that cannot run on read-only replicas + if strings.Contains(strings.ToLower(route.Query), "for update") { + return + } + if vcursor.GetWarmingReadsPercent() == 0 || rand.IntN(100) > vcursor.GetWarmingReadsPercent() { return } diff --git a/go/vt/vtgate/engine/route_warming_reads_test.go b/go/vt/vtgate/engine/route_warming_reads_test.go new file mode 100644 index 00000000000..afff47eb13e --- /dev/null +++ b/go/vt/vtgate/engine/route_warming_reads_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + +type warmingReadsVCursor struct { + *loggingVCursor + warmingReadsPercent int + warmingReadsChannel chan bool + warmingReadsExecuteFunc func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) +} + +func (vc *warmingReadsVCursor) GetWarmingReadsPercent() int { + return vc.warmingReadsPercent +} + +func (vc *warmingReadsVCursor) GetWarmingReadsChannel() chan bool { + return vc.warmingReadsChannel +} + +func (vc *warmingReadsVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { + clone := &warmingReadsVCursor{ + loggingVCursor: vc.loggingVCursor, + warmingReadsPercent: vc.warmingReadsPercent, + warmingReadsChannel: vc.warmingReadsChannel, + warmingReadsExecuteFunc: vc.warmingReadsExecuteFunc, + } + clone.onExecuteMultiShardFn = vc.warmingReadsExecuteFunc + return clone +} + +func TestWarmingReadsSkipsForUpdate(t *testing.T) { + vindex, _ := vindexes.CreateVindex("hash", "", nil) + testCases := []struct { + name string + query string + shouldSkip bool + }{ + { + name: "SELECT FOR UPDATE", + query: "SELECT * FROM users WHERE id = 1 FOR UPDATE", + shouldSkip: true, + }, + { + name: "SELECT FOR UPDATE mixed case", + query: "SELECT * FROM users WHERE id = 1 FoR UpDaTe", + shouldSkip: true, + }, + { + name: "Regular SELECT", + query: "SELECT * FROM users WHERE id = 1", + shouldSkip: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + route := NewRoute( + EqualUnique, + &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + tc.query, + "dummy_select_field", + ) + route.Vindex = vindex.(vindexes.SingleColumn) + route.Values = []evalengine.Expr{ + evalengine.NewLiteralInt(1), + } + + var warmingReadExecuted atomic.Bool + vc := &warmingReadsVCursor{ + loggingVCursor: &loggingVCursor{ + shards: []string{"-20", "20-"}, + results: []*sqltypes.Result{defaultSelectResult}, + }, + warmingReadsPercent: 100, + warmingReadsChannel: make(chan bool, 1), + } + vc.warmingReadsExecuteFunc = func(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) { + warmingReadExecuted.Store(true) + } + + _, err := route.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + + if tc.shouldSkip { + time.Sleep(50 * time.Millisecond) + require.False(t, warmingReadExecuted.Load(), "warming read should not be executed for FOR UPDATE queries") + } else { + require.Eventually(t, func() bool { + return warmingReadExecuted.Load() + }, time.Second, 10*time.Millisecond, "warming read should be executed for regular SELECT queries") + } + }) + } +} From 5bd4297f04a07c03864433ebed55e36be92706cb Mon Sep 17 00:00:00 2001 From: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> Date: Mon, 22 Dec 2025 10:23:38 -0500 Subject: [PATCH 2/3] Update to use AST and remove the update lock rather than skip query entirely Signed-off-by: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> --- go/vt/vtgate/engine/route.go | 45 ++++++++++++++-- .../vtgate/engine/route_warming_reads_test.go | 51 +++++++++++-------- 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 3216ec0734f..7e331bbf3f2 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -508,13 +508,20 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs return } - // Skip SELECT ... FOR UPDATE queries that cannot run on read-only replicas - if strings.Contains(strings.ToLower(route.Query), "for update") { + if vcursor.GetWarmingReadsPercent() == 0 || rand.IntN(100) > vcursor.GetWarmingReadsPercent() { return } - if vcursor.GetWarmingReadsPercent() == 0 || rand.IntN(100) > vcursor.GetWarmingReadsPercent() { - return + // Remove FOR UPDATE locks for warming reads if present + warmingQueries := queries + if modifiedQuery, ok := removeForUpdateLocks(route.Query); ok { + warmingQueries = make([]*querypb.BoundQuery, len(queries)) + for i, query := range queries { + warmingQueries[i] = &querypb.BoundQuery{ + Sql: modifiedQuery, + BindVariables: query.BindVariables, + } + } } replicaVCursor := vcursor.CloneForReplicaWarming(ctx) @@ -532,7 +539,7 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs return } - _, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID) + _, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, warmingQueries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID) if len(errs) > 0 { log.Warningf("Failed to execute warming replica read: %v", errs) } else { @@ -543,3 +550,31 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs log.Warning("Failed to execute warming replica read as pool is full") } } + +func removeForUpdateLocks(query string) (string, bool) { + parser, err := sqlparser.New(sqlparser.Options{}) + if err != nil { + return query, false + } + stmt, err := parser.Parse(query) + if err != nil { + return query, false + } + sel, ok := stmt.(*sqlparser.Select) + if !ok { + return query, false + } + + // Check if this is a FOR UPDATE query + if sel.Lock != sqlparser.ForUpdateLock && + sel.Lock != sqlparser.ForUpdateLockNoWait && + sel.Lock != sqlparser.ForUpdateLockSkipLocked { + return query, false + } + + // Remove the lock clause + sel.Lock = sqlparser.NoLock + + // Convert back to SQL string + return sqlparser.String(sel), true +} diff --git a/go/vt/vtgate/engine/route_warming_reads_test.go b/go/vt/vtgate/engine/route_warming_reads_test.go index afff47eb13e..a0df8096bc3 100644 --- a/go/vt/vtgate/engine/route_warming_reads_test.go +++ b/go/vt/vtgate/engine/route_warming_reads_test.go @@ -61,24 +61,34 @@ func (vc *warmingReadsVCursor) CloneForReplicaWarming(ctx context.Context) VCurs func TestWarmingReadsSkipsForUpdate(t *testing.T) { vindex, _ := vindexes.CreateVindex("hash", "", nil) testCases := []struct { - name string - query string - shouldSkip bool + name string + query string + expectedWarmingQuery string }{ { - name: "SELECT FOR UPDATE", - query: "SELECT * FROM users WHERE id = 1 FOR UPDATE", - shouldSkip: true, + name: "SELECT FOR UPDATE", + query: "SELECT * FROM users WHERE id = 1 FOR UPDATE", + expectedWarmingQuery: "select * from users where id = 1", }, { - name: "SELECT FOR UPDATE mixed case", - query: "SELECT * FROM users WHERE id = 1 FoR UpDaTe", - shouldSkip: true, + name: "SELECT FOR UPDATE mixed case", + query: "SELECT * FROM users WHERE id = 1 FoR UpDaTe", + expectedWarmingQuery: "select * from users where id = 1", }, { - name: "Regular SELECT", - query: "SELECT * FROM users WHERE id = 1", - shouldSkip: false, + name: "SELECT FOR UPDATE with extra spaces", + query: "SELECT * FROM users WHERE id = 1 FOR UPDATE", + expectedWarmingQuery: "select * from users where id = 1", + }, + { + name: "SELECT FOR UPDATE with comment", + query: "SELECT * FROM users WHERE id = 1 FOR /* comment */ UPDATE", + expectedWarmingQuery: "select * from users where id = 1", + }, + { + name: "Regular SELECT", + query: "SELECT * FROM users WHERE id = 1", + expectedWarmingQuery: "SELECT * FROM users WHERE id = 1", }, } @@ -99,6 +109,7 @@ func TestWarmingReadsSkipsForUpdate(t *testing.T) { } var warmingReadExecuted atomic.Bool + var capturedQuery string vc := &warmingReadsVCursor{ loggingVCursor: &loggingVCursor{ shards: []string{"-20", "20-"}, @@ -108,20 +119,20 @@ func TestWarmingReadsSkipsForUpdate(t *testing.T) { warmingReadsChannel: make(chan bool, 1), } vc.warmingReadsExecuteFunc = func(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) { + if len(queries) > 0 { + capturedQuery = queries[0].Sql + } warmingReadExecuted.Store(true) } _, err := route.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) - if tc.shouldSkip { - time.Sleep(50 * time.Millisecond) - require.False(t, warmingReadExecuted.Load(), "warming read should not be executed for FOR UPDATE queries") - } else { - require.Eventually(t, func() bool { - return warmingReadExecuted.Load() - }, time.Second, 10*time.Millisecond, "warming read should be executed for regular SELECT queries") - } + require.Eventually(t, func() bool { + return warmingReadExecuted.Load() + }, time.Second, 10*time.Millisecond, "warming read should be executed") + + require.Equal(t, tc.expectedWarmingQuery, capturedQuery, "warming read query should match expected") }) } } From ee058cff865efc5e191ad66907c5e66aff77d6ab Mon Sep 17 00:00:00 2001 From: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> Date: Mon, 22 Dec 2025 11:28:37 -0500 Subject: [PATCH 3/3] Store the parsed query in the Route to avoid re-parsing for warming reads Signed-off-by: Emily Signorelli <212963194+esignorelli@users.noreply.github.com> --- go/vt/vtgate/engine/route.go | 19 +++++++------------ .../vtgate/engine/route_warming_reads_test.go | 4 ++++ go/vt/vtgate/planbuilder/route.go | 1 + 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 7e331bbf3f2..54653d52b3e 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -60,6 +60,9 @@ type Route struct { // Query specifies the query to be executed. Query string + // QueryStatement is the parsed AST of Query + QueryStatement sqlparser.Statement + // FieldQuery specifies the query to be executed for a GetFieldInfo request. FieldQuery string @@ -514,7 +517,7 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs // Remove FOR UPDATE locks for warming reads if present warmingQueries := queries - if modifiedQuery, ok := removeForUpdateLocks(route.Query); ok { + if modifiedQuery, ok := removeForUpdateLocks(route.QueryStatement); ok { warmingQueries = make([]*querypb.BoundQuery, len(queries)) for i, query := range queries { warmingQueries[i] = &querypb.BoundQuery{ @@ -551,25 +554,17 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs } } -func removeForUpdateLocks(query string) (string, bool) { - parser, err := sqlparser.New(sqlparser.Options{}) - if err != nil { - return query, false - } - stmt, err := parser.Parse(query) - if err != nil { - return query, false - } +func removeForUpdateLocks(stmt sqlparser.Statement) (string, bool) { sel, ok := stmt.(*sqlparser.Select) if !ok { - return query, false + return "", false } // Check if this is a FOR UPDATE query if sel.Lock != sqlparser.ForUpdateLock && sel.Lock != sqlparser.ForUpdateLockNoWait && sel.Lock != sqlparser.ForUpdateLockSkipLocked { - return query, false + return "", false } // Remove the lock clause diff --git a/go/vt/vtgate/engine/route_warming_reads_test.go b/go/vt/vtgate/engine/route_warming_reads_test.go index a0df8096bc3..f0dadeb4c13 100644 --- a/go/vt/vtgate/engine/route_warming_reads_test.go +++ b/go/vt/vtgate/engine/route_warming_reads_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -103,6 +104,9 @@ func TestWarmingReadsSkipsForUpdate(t *testing.T) { tc.query, "dummy_select_field", ) + // Parse and set QueryStatement to match how Routes are created in production + parser, _ := sqlparser.NewTestParser().Parse(tc.query) + route.QueryStatement = parser route.Vindex = vindex.(vindexes.SingleColumn) route.Values = []evalengine.Expr{ evalengine.NewLiteralInt(1), diff --git a/go/vt/vtgate/planbuilder/route.go b/go/vt/vtgate/planbuilder/route.go index 167bfa6e191..e2153e65a6f 100644 --- a/go/vt/vtgate/planbuilder/route.go +++ b/go/vt/vtgate/planbuilder/route.go @@ -29,6 +29,7 @@ import ( func WireupRoute(ctx *plancontext.PlanningContext, eroute *engine.Route, sel sqlparser.SelectStatement) (engine.Primitive, error) { // prepare the queries we will pass down eroute.Query = sqlparser.String(sel) + eroute.QueryStatement = sel buffer := sqlparser.NewTrackedBuffer(sqlparser.FormatImpossibleQuery) node := buffer.WriteNode(sel) eroute.FieldQuery = node.ParsedQuery().Query