Skip to content

Commit

Permalink
plan, partition: support point get plan on hash partition table (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
Lingyu Song authored and tiancaiamao committed Jan 10, 2020
1 parent 4ed347c commit b6aca51
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 36 deletions.
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
dedup := make(map[hack.MutableString]struct{})
keys := make([]kv.Key, 0, len(e.idxVals))
for _, idxVals := range e.idxVals {
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, idxVals)
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, idxVals, e.tblInfo.ID)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down
16 changes: 12 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type PointGetExecutor struct {
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
partInfo *model.PartitionDefinition
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
Expand All @@ -80,6 +81,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
e.rowDecoder = decoder
e.partInfo = p.PartitionInfo
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -111,8 +113,14 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
var tblID int64
if e.partInfo != nil {
tblID = e.partInfo.ID
} else {
tblID = e.tblInfo.ID
}
if e.idxInfo != nil {
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, e.idxVals)
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, e.idxVals, tblID)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down Expand Up @@ -144,7 +152,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
})
}

key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
key := tablecodec.EncodeRowKeyWithHandle(tblID, e.handle)
val, err := e.get(ctx, key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
Expand Down Expand Up @@ -190,7 +198,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) (val []byte, err
return e.snapshot.Get(ctx, key)
}

func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum) (_ []byte, err error) {
func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
for i := range idxVals {
colInfo := tblInfo.Columns[idxInfo.Columns[i].Offset]
Expand All @@ -213,7 +221,7 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
if err != nil {
return nil, err
}
return tablecodec.EncodeIndexSeekKey(tblInfo.ID, idxInfo.ID, encodedIdxVals), nil
return tablecodec.EncodeIndexSeekKey(tID, idxInfo.ID, encodedIdxVals), nil
}

func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
Expand Down
1 change: 1 addition & 0 deletions expression/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *testSuite2) TestHashPartitionPruner(c *C) {
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t2(id int, a int, b int, primary key(id, a)) partition by hash(id + a) partitions 10;")
tk.MustExec("create table t1(id int primary key, a int, b int) partition by hash(id) partitions 10;")
tk.MustExec("create table t3(id int, a int, b int, primary key(id, a)) partition by hash(id) partitions 10;")

var input []string
var output []struct {
Expand Down
2 changes: 1 addition & 1 deletion expression/testdata/partition_pruner_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"cases": [
// Point Select.
"explain select * from t1 where id = 7 and a = 6",
"explain select * from t2 where id = 9 and a = 1",
"explain select * from t3 where id = 9 and a = 1",
"explain select * from t2 where id = 9 and a = -110",
"explain select * from t1 where id = -17",
"explain select * from t2 where id = a and a = b and b = 2",
Expand Down
6 changes: 2 additions & 4 deletions expression/testdata/partition_pruner_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
]
},
{
"SQL": "explain select * from t2 where id = 9 and a = 1",
"SQL": "explain select * from t3 where id = 9 and a = 1",
"Result": [
"IndexLookUp_8 1.00 root ",
"├─IndexScan_6 1.00 cop[tikv] table:t2, partition:p0, index:id, a, range:[9 1,9 1], keep order:false, stats:pseudo",
"└─TableScan_7 1.00 cop[tikv] table:t2, partition:p0, keep order:false, stats:pseudo"
"Point_Get_1 1.00 root table:t3, index:id a, partition:p9"
]
},
{
Expand Down
51 changes: 48 additions & 3 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"github.com/pingcap/parser/opcode"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -44,6 +46,7 @@ type PointGetPlan struct {
schema *expression.Schema
TblInfo *model.TableInfo
IndexInfo *model.IndexInfo
PartitionInfo *model.PartitionDefinition
Handle int64
HandleParam *driver.ParamMarkerExpr
IndexValues []types.Datum
Expand Down Expand Up @@ -112,6 +115,9 @@ func (p *PointGetPlan) explainInfo(normalized bool) string {
if p.Lock {
fmt.Fprintf(buffer, ", lock")
}
if p.PartitionInfo != nil {
fmt.Fprintf(buffer, ", partition:%s", p.PartitionInfo.Name.L)
}
return buffer.String()
}

Expand Down Expand Up @@ -552,9 +558,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
// Table partition implementation translates LogicalPlan from `DataSource` to
// `Union -> DataSource` in the logical plan optimization pass, since PointGetPlan
// bypass the logical plan optimization, it can't support partitioned table.
if tbl.GetPartitionInfo() != nil {
return nil
}
pi := tbl.GetPartitionInfo()
for _, col := range tbl.Columns {
// Do not handle generated columns.
if col.IsGenerated() {
Expand All @@ -570,6 +574,17 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
if pairs == nil {
return nil
}

var partitionInfo *model.PartitionDefinition
if pi != nil {
if pi.Type != model.PartitionTypeHash {
return nil
}
partitionInfo = getPartitionInfo(ctx, tbl, pairs)
if partitionInfo == nil {
return nil
}
}
handlePair, fieldType := findPKHandle(tbl, pairs)
if handlePair.value.Kind() != types.KindNull && len(pairs) == 1 {
schema, names := buildSchemaFromFields(tblName.Schema, tbl, tblAlias, selStmt.Fields.Fields)
Expand Down Expand Up @@ -602,6 +617,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
p.Handle = intDatum.GetInt64()
p.UnsignedHandle = mysql.HasUnsignedFlag(fieldType.Flag)
p.HandleParam = handlePair.param
p.PartitionInfo = partitionInfo
return p
}

Expand All @@ -628,6 +644,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
p.IndexInfo = idxInfo
p.IndexValues = idxValues
p.IndexValueParams = idxValueParams
p.PartitionInfo = partitionInfo
return p
}
return nil
Expand Down Expand Up @@ -986,6 +1003,7 @@ func colInfoToColumn(col *model.ColumnInfo, idx int) *expression.Column {
ID: col.ID,
UniqueID: int64(col.Offset),
Index: idx,
OrigName: col.Name.L,
}
}

Expand All @@ -1006,3 +1024,30 @@ func (p *PointGetPlan) findHandleCol() *expression.Column {
}
return handleCol
}

func getPartitionInfo(ctx sessionctx.Context, tbl *model.TableInfo, pairs []nameValuePair) *model.PartitionDefinition {
is := infoschema.GetInfoSchema(ctx)
table, ok := is.TableByID(tbl.ID)
if !ok {
return nil
}
pi := tbl.Partition
if partitionTable, ok := table.(partitionTable); ok {
// PartitionExpr don't need columns and names for hash partition.
partitionExpr, err := partitionTable.PartitionExpr(ctx, nil, nil)
if err != nil {
return nil
}
expr := partitionExpr.OrigExpr
if col, ok := expr.(*ast.ColumnNameExpr); ok {
for _, pair := range pairs {
if col.Name.Name.L == pair.colName {
val := pair.value.GetInt64()
pos := math.Abs(val) % int64(pi.Num)
return &pi.Definitions[pos]
}
}
}
}
return nil
}
9 changes: 8 additions & 1 deletion planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core
import (
"context"
"errors"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/util"
Expand Down Expand Up @@ -165,6 +166,12 @@ func (s *partitionProcessor) pruneHashPartition(ds *DataSource, pi *model.Partit
}
children := make([]LogicalPlan, 0, len(pi.Definitions))
for i := 0; i < len(pi.Definitions); i++ {
// This is for `table partition (p0,p1)` syntax, only union the specified partition if has specified partitions.
if len(ds.partitionNames) != 0 {
if !s.findByName(ds.partitionNames, pi.Definitions[i].Name.L) {
continue
}
}
// Not a deep copy.
newDataSource := *ds
newDataSource.baseLogicalPlan = newBaseLogicalPlan(ds.SCtx(), plancodec.TypeTableScan, &newDataSource, ds.blockOffset)
Expand Down Expand Up @@ -197,7 +204,7 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {
filterConds := ds.allConds

// Try to locate partition directly for hash partition.
if pi.Type == model.PartitionTypeHash && len(filterConds) > 0 {
if pi.Type == model.PartitionTypeHash {
return s.pruneHashPartition(ds, pi)
}

Expand Down
64 changes: 42 additions & 22 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -122,6 +125,8 @@ type PartitionExpr struct {
Column *expression.Column
Ranges []expression.Expression
UpperBounds []expression.Expression
// OrigExpr is the partition expression ast.
OrigExpr ast.ExprNode
// Expr is the hash partition expression.
Expr expression.Expression
}
Expand Down Expand Up @@ -207,44 +212,41 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) {
var column *expression.Column
// The caller should assure partition info is not nil.
partitionPruneExprs := make([]expression.Expression, 0, len(pi.Definitions))
var buf bytes.Buffer
schema := expression.NewSchema(columns...)
for i := 0; i < int(pi.Num); i++ {
fmt.Fprintf(&buf, "MOD(ABS(%s),(%d))=%d", pi.Expr, pi.Num, i)
exprs, err := expression.ParseSimpleExprsWithNames(ctx, buf.String(), schema, names)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err))
return nil, errors.Trace(err)
}
// Get a hash code in advance to prevent data race afterwards.
exprs[0].HashCode(ctx.GetSessionVars().StmtCtx)
partitionPruneExprs = append(partitionPruneExprs, exprs[0])
buf.Reset()
origExpr, err := parsePartitionExpr(ctx, pi.Expr, schema, names)
if err != nil {
return nil, err
}
exprs, err := expression.ParseSimpleExprsWithNames(ctx, pi.Expr, schema, names)
exprs, err := rewritePartitionExpr(ctx, origExpr, schema, names)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err))
return nil, errors.Trace(err)
}
exprs[0].HashCode(ctx.GetSessionVars().StmtCtx)
if col, ok := exprs[0].(*expression.Column); ok {
exprs.HashCode(ctx.GetSessionVars().StmtCtx)
if col, ok := exprs.(*expression.Column); ok {
column = col
}
return &PartitionExpr{
Column: column,
Expr: exprs[0],
Ranges: partitionPruneExprs,
Column: column,
Expr: exprs,
OrigExpr: origExpr,
}, nil
}

// PartitionExpr returns the partition expression.
func (t *partitionedTable) PartitionExpr(ctx sessionctx.Context, columns []*expression.Column, names types.NameSlice) (*PartitionExpr, error) {
// TODO: a better performance implementation:
// TODO: a better performance implementation for range partition:
// traverse the Expression, find all columns and rewrite them.
return newPartitionExprBySchema(ctx, t.meta, columns, names)
// return newPartitionExprBySchema(ctx, t.meta, columns, names)
pi := t.meta.GetPartitionInfo()
switch pi.Type {
case model.PartitionTypeHash:
return t.partitionExpr, nil
case model.PartitionTypeRange:
return generatePartitionExpr(ctx, pi, columns, names)
}
panic("cannot reach here")
}

func partitionRecordKey(pid int64, handle int64) kv.Key {
Expand Down Expand Up @@ -420,3 +422,21 @@ func FindPartitionByName(meta *model.TableInfo, parName string) (int64, error) {
}
return -1, errors.Trace(table.ErrUnknownPartition.GenWithStackByArgs(parName, meta.Name.O))
}

func parsePartitionExpr(ctx sessionctx.Context, exprStr string, schema *expression.Schema, names types.NameSlice) (ast.ExprNode, error) {
exprStr = "select " + exprStr
stmts, warns, err := parser.New().Parse(exprStr, "", "")
for _, warn := range warns {
ctx.GetSessionVars().StmtCtx.AppendWarning(util.SyntaxWarn(warn))
}
if err != nil {
return nil, util.SyntaxWarn(err)
}
fields := stmts[0].(*ast.SelectStmt).Fields.Fields
return fields[0].Expr, nil
}

func rewritePartitionExpr(ctx sessionctx.Context, field ast.ExprNode, schema *expression.Schema, names types.NameSlice) (expression.Expression, error) {
expr, err := expression.RewriteSimpleExprWithNames(ctx, field, schema, names)
return expr, err
}

0 comments on commit b6aca51

Please sign in to comment.