Skip to content

Commit

Permalink
diff: split bucket by random if the bucket's count is twice more than…
Browse files Browse the repository at this point in the history
… chunk-size (#256)
  • Loading branch information
WangXiangUSTC authored Oct 10, 2019
1 parent adaa902 commit 3b04f08
Show file tree
Hide file tree
Showing 15 changed files with 610 additions and 540 deletions.
2 changes: 1 addition & 1 deletion ddl_checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/ddl-checker"
checker "github.com/pingcap/tidb-tools/pkg/ddl-checker"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog-filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/table-rule-selector"
selector "github.com/pingcap/tidb-tools/pkg/table-rule-selector"
)

// ActionType indicates how to handle matched items
Expand Down
3 changes: 1 addition & 2 deletions pkg/check/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (
"fmt"
"strings"

"github.com/pingcap/tidb-tools/pkg/column-mapping"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/column-mapping/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/table-rule-selector"
selector "github.com/pingcap/tidb-tools/pkg/table-rule-selector"
)

var (
Expand Down
52 changes: 24 additions & 28 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func GetCreateTableSQL(ctx context.Context, db *sql.DB, schemaName string, table

// GetRowCount returns row count of the table.
// if not specify where condition, return total row count of the table.
func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName string, where string) (int64, error) {
func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName string, where string, args []interface{}) (int64, error) {
/*
select count example result:
mysql> SELECT count(1) cnt from `test`.`itest` where id > 0;
Expand All @@ -179,10 +179,10 @@ func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName s
if len(where) > 0 {
query += fmt.Sprintf(" WHERE %s", where)
}
log.Debug("get row count", zap.String("sql", query))
log.Debug("get row count", zap.String("sql", query), zap.Reflect("args", args))

var cnt sql.NullInt64
err := db.QueryRowContext(ctx, query).Scan(&cnt)
err := db.QueryRowContext(ctx, query, args...).Scan(&cnt)
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -193,20 +193,18 @@ func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName s
return cnt.Int64, nil
}

// GetRandomValues returns some random value and these value's count of a column, just like sampling. Tips: limitArgs is the value in limitRange.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int, limitRange string, limitArgs []interface{}, collation string) ([]string, []int, error) {
// GetRandomValues returns some random value. Tips: limitArgs is the value in limitRange.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int, limitRange string, limitArgs []interface{}, collation string) ([]string, error) {
/*
example:
mysql> SELECT `id`, COUNT(*) count FROM (SELECT `id` FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 ORDER BY RAND() LIMIT 5) rand_tmp GROUP BY `id` ORDER BY `id` COLLATE "latin1_bin";
+------+-------+
| id | count |
+------+-------+
| 1 | 2 |
| 2 | 2 |
| 3 | 1 |
+------+-------+
FIXME: TiDB now don't return rand value when use `ORDER BY RAND()`
mysql> SELECT `id` FROM (SELECT `id`, rand() rand_value FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 ORDER BY rand_value LIMIT 5) rand_tmp ORDER BY `id` COLLATE "latin1_bin";
+------+
| id |
+------+
| 1 |
| 2 |
| 3 |
+------+
*/

if limitRange == "" {
Expand All @@ -217,31 +215,27 @@ func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

randomValue := make([]string, 0, num)
valueCount := make([]int, 0, num)

query := fmt.Sprintf("SELECT %[1]s, COUNT(*) count FROM (SELECT %[1]s FROM %[2]s WHERE %[3]s ORDER BY RAND() LIMIT %[4]d)rand_tmp GROUP BY %[1]s ORDER BY %[1]s%[5]s",
query := fmt.Sprintf("SELECT %[1]s FROM (SELECT %[1]s, rand() rand_value FROM %[2]s WHERE %[3]s ORDER BY rand_value LIMIT %[4]d)rand_tmp ORDER BY %[1]s%[5]s",
escapeName(column), TableName(schemaName, table), limitRange, num, collation)
log.Debug("get random values", zap.String("sql", query), zap.Reflect("args", limitArgs))

rows, err := db.QueryContext(ctx, query, limitArgs...)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
defer rows.Close()

randomValue := make([]string, 0, num)
for rows.Next() {
var value string
var count int
err = rows.Scan(&value, &count)
err = rows.Scan(&value)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
randomValue = append(randomValue, value)
valueCount = append(valueCount, count)
}

return randomValue, valueCount, errors.Trace(rows.Err())
return randomValue, errors.Trace(rows.Err())
}

// GetMinMaxValue return min and max value of given column by specified limitRange condition.
Expand Down Expand Up @@ -291,6 +285,7 @@ func GetMinMaxValue(ctx context.Context, db *sql.DB, schema, table, column strin
}

func queryTables(ctx context.Context, db *sql.DB, q string) (tables []string, err error) {
log.Debug("query tables", zap.String("query", q))
rows, err := db.QueryContext(ctx, q)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -328,13 +323,13 @@ func GetTables(ctx context.Context, db *sql.DB, schemaName string) (tables []str
| NTEST | BASE TABLE |
+----------------+------------+
*/
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW';", schemaName)
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type != 'VIEW';", escapeName(schemaName))
return queryTables(ctx, db, query)
}

// GetViews returns names of all views in the specified schema
func GetViews(ctx context.Context, db *sql.DB, schemaName string) (tables []string, err error) {
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type = 'VIEW';", schemaName)
query := fmt.Sprintf("SHOW FULL TABLES IN `%s` WHERE Table_Type = 'VIEW';", escapeName(schemaName))
return queryTables(ctx, db, query)
}

Expand Down Expand Up @@ -429,7 +424,7 @@ func GetBucketsInfo(ctx context.Context, db *sql.DB, schema, table string, table
*/
buckets := make(map[string][]Bucket)
query := "SHOW STATS_BUCKETS WHERE db_name= ? AND table_name= ?;"
log.Debug("GetBucketsInfo", zap.String("sql", query))
log.Debug("GetBucketsInfo", zap.String("sql", query), zap.String("schema", schema), zap.String("table", table))

rows, err := db.QueryContext(ctx, query, schema, table)
if err != nil {
Expand Down Expand Up @@ -502,6 +497,7 @@ func AnalyzeValuesFromBuckets(valueString string, cols []*model.ColumnInfo) ([]s
if IsTimeTypeAndNeedDecode(col.Tp) {
value, err := DecodeTimeInBucket(values[i])
if err != nil {
log.Error("analyze values from buckets", zap.String("column", col.Name.O), zap.String("value", values[i]), zap.Error(err))
return nil, errors.Trace(err)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/diff/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func loadChunks(ctx context.Context, db *sql.DB, instanceID, schema, table strin
if err != nil {
return nil, err
}
chunk.updateColumnOffset()
chunks = append(chunks, chunk)
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/diff/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ func (s *testCheckpointSuite) testInitAndGetSummary(c *C, db *sql.DB) {

func (s *testCheckpointSuite) testSaveAndLoadChunk(c *C, db *sql.DB) {
chunk := &ChunkRange{
ID: 1,
Bounds: []*Bound{{Column: "a", Lower: "1", LowerSymbol: ">"}},
Mode: normalMode,
State: successState,
ID: 1,
Bounds: []*Bound{{Column: "a", Lower: "1"}},
State: successState,
columnOffset: map[string]int{"a": 0},
}

err := saveChunk(context.Background(), db, chunk.ID, "target", "test", "checkpoint", "", chunk)
c.Assert(err, IsNil)

newChunk, err := getChunk(context.Background(), db, "target", "test", "checkpoint", chunk.ID)
c.Assert(err, IsNil)
newChunk.updateColumnOffset()
c.Assert(newChunk, DeepEquals, chunk)

chunks, err := loadChunks(context.Background(), db, "target", "test", "checkpoint")
Expand Down
Loading

0 comments on commit 3b04f08

Please sign in to comment.