Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions go/vt/vtgate/planbuilder/operators/info_schema_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ type InfoSchemaRouting struct {
seenPredicates []sqlparser.Expr
}

func (isr *InfoSchemaRouting) extraInfo() string {
var parts []string
if len(isr.SysTableTableSchema) > 0 {
var exprs []string
for _, expr := range isr.SysTableTableSchema {
exprs = append(exprs, sqlparser.String(expr))
}
parts = append(parts, "SysTableTableSchema:["+strings.Join(exprs, ", ")+"]")
}
if len(isr.SysTableTableName) > 0 {
var exprs []string
for k, expr := range isr.SysTableTableName {
exprs = append(exprs, k+"="+sqlparser.String(expr))
}
parts = append(parts, "SysTableTableName:["+strings.Join(exprs, ", ")+"]")
}
return strings.Join(parts, " ")
}

func (isr *InfoSchemaRouting) UpdateRoutingParams(ctx *plancontext.PlanningContext, rp *engine.RoutingParameters) {
rp.SysTableTableSchema = nil
for _, expr := range isr.SysTableTableSchema {
Expand Down Expand Up @@ -81,7 +100,7 @@ func (isr *InfoSchemaRouting) Clone() Routing {

func (isr *InfoSchemaRouting) updateRoutingLogic(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Routing {
isr.seenPredicates = append(isr.seenPredicates, expr)
isTableSchema, bvName, out := extractInfoSchemaRoutingPredicate(ctx, expr)
isTableSchema, bvName, out := extractInfoSchemaRoutingPredicate(ctx, sqlparser.CloneExpr(expr))
if out == nil {
return isr
}
Expand Down Expand Up @@ -201,6 +220,7 @@ func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, rout

// if we have no schema predicates on either side, we can merge if the table info is the same
case len(isrA.SysTableTableSchema) == 0 && len(isrB.SysTableTableSchema) == 0:
isrA.seenPredicates = append(isrA.seenPredicates, isrB.seenPredicates...)
for k, expr := range isrB.SysTableTableName {
if e, found := isrA.SysTableTableName[k]; found && !sqlparser.Equals.Expr(expr, e) {
// schema names are the same, but we have contradicting table names, so we give up
Expand All @@ -212,6 +232,7 @@ func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, rout

// if both sides have the same schema predicate, we can safely merge them
case equalExprs(isrA.SysTableTableSchema, isrB.SysTableTableSchema):
isrA.seenPredicates = append(isrA.seenPredicates, isrB.seenPredicates...)
for k, expr := range isrB.SysTableTableName {
isrA.SysTableTableName[k] = expr
}
Expand All @@ -224,15 +245,12 @@ func tryMergeInfoSchemaRoutings(ctx *plancontext.PlanningContext, routingA, rout
}

func equalExprs(a, b []sqlparser.Expr) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if !sqlparser.Equals.Expr(a[i], b[i]) {
return false
if sqlparser.Equals.Expr(a[i], b[i]) {
return true
}
}
return true
return false
}

var (
Expand Down
18 changes: 6 additions & 12 deletions go/vt/vtgate/planbuilder/operators/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package operators
import (
"fmt"
"slices"
"strings"

"vitess.io/vitess/go/slice"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -101,28 +102,23 @@ func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex
if !ok {
panic(vterrors.VT12001("pushing predicates on UNION where the first SELECT contains * or NEXT"))
}
offsets[ae.ColumnName()] = i
offsets[strings.ToLower(ae.ColumnName())] = i
}

if jp, ok := expr.(*predicates.JoinPredicate); ok {
expr = jp.Current()
ctx.PredTracker.Skip(jp.ID)
}

needsFilter, exprPerSource := u.predicatePerSource(expr, offsets)
if needsFilter {
return newFilter(u, expr)
}

exprPerSource := u.predicatePerSource(expr, offsets)
for i, src := range u.Sources {
u.Sources[i] = src.AddPredicate(ctx, exprPerSource[i])
}

return u
}

func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) (bool, []sqlparser.Expr) {
needsFilter := false
func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) []sqlparser.Expr {
exprPerSource := make([]sqlparser.Expr, len(u.Sources))

for i := range u.Sources {
Expand All @@ -134,9 +130,7 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int)

idx, ok := offsets[col.Name.Lowered()]
if !ok {
needsFilter = true
cursor.StopTreeWalk()
return
panic(vterrors.VT13001(fmt.Sprintf("could not find the column '%s' on the UNION", sqlparser.String(col))))
}

sel := u.GetSelectFor(i)
Expand All @@ -151,7 +145,7 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int)
exprPerSource[i] = predicate
}

return needsFilter, exprPerSource
return exprPerSource
}

func (u *Union) GetSelectFor(source int) *sqlparser.Select {
Expand Down
46 changes: 46 additions & 0 deletions go/vt/vtgate/planbuilder/operators/union_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func mergeUnionInputs(
if res != nil {
return res, exprs
}

case a == infoSchema && b == infoSchema:
return tryMergeUnionInfoSchemaRouting(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct)
}
return nil, nil
}
Expand Down Expand Up @@ -176,6 +179,49 @@ func tryMergeUnionShardedRouting(
return nil, nil
}

func tryMergeUnionInfoSchemaRouting(
ctx *plancontext.PlanningContext,
routeA, routeB *Route,
exprsA, exprsB []sqlparser.SelectExpr,
distinct bool,
) (Operator, []sqlparser.SelectExpr) {
isrA := routeA.Routing.(*InfoSchemaRouting)
isrB := routeB.Routing.(*InfoSchemaRouting)
emptyA := len(isrA.SysTableTableName) == 0 && len(isrA.SysTableTableSchema) == 0
emptyB := len(isrB.SysTableTableName) == 0 && len(isrB.SysTableTableSchema) == 0

switch {
// if either side has no predicates to help us route, we can merge them
case emptyA:
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, isrB, nil)
case emptyB:
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, isrA, nil)

// if we have no schema predicates on either side, we can merge if the table info is compatible
case len(isrA.SysTableTableSchema) == 0 && len(isrB.SysTableTableSchema) == 0:
isrA.seenPredicates = append(isrA.seenPredicates, isrB.seenPredicates...)
for k, expr := range isrB.SysTableTableName {
if e, found := isrA.SysTableTableName[k]; found && !sqlparser.Equals.Expr(expr, e) {
// contradicting table names, give up
return nil, nil
}
isrA.SysTableTableName[k] = expr
}
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, isrA, nil)

// if both sides have the same schema predicate, we can safely merge them
case equalExprs(isrA.SysTableTableSchema, isrB.SysTableTableSchema):
isrA.seenPredicates = append(isrA.seenPredicates, isrB.seenPredicates...)
for k, expr := range isrB.SysTableTableName {
isrA.SysTableTableName[k] = expr
}
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, isrA, nil)

default:
return nil, nil
}
}

func createMergedUnion(
ctx *plancontext.PlanningContext,
lhsRoute, rhsRoute *Route,
Expand Down
Loading
Loading