Skip to content

Commit

Permalink
test: make test TestTopSQLCPUProfile stable by mock high cpu usage (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jun 25, 2021
1 parent 8434069 commit 9e8bb50
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 50 deletions.
106 changes: 56 additions & 50 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,8 +1202,11 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
}()

c.Assert(failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`), IsNil)
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")
err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")
c.Assert(err, IsNil)
err = failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL")
c.Assert(err, IsNil)
}()

Expand Down Expand Up @@ -1233,8 +1236,8 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
{sql: "update t set b=a where b is null limit 1;", planRegexp: ".*Limit.*TableReader.*"},
{sql: "delete from t where b = a limit 2;", planRegexp: ".*Limit.*TableReader.*"},
{sql: "replace into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""},
{sql: "select * from t use index(idx) where a<10000000;", planRegexp: ".*IndexLookUp.*"},
{sql: "select * from t ignore index(idx) where a>0;", planRegexp: ".*TableReader.*"},
{sql: "select * from t use index(idx) where a<10;", planRegexp: ".*IndexLookUp.*"},
{sql: "select * from t ignore index(idx) where a>1000000000;", planRegexp: ".*TableReader.*"},
{sql: "select /*+ HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.a=t2.a where t1.b is not null;", planRegexp: ".*HashJoin.*"},
{sql: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t2.a=t1.a where t1.b is not null;", planRegexp: ".*IndexHashJoin.*"},
{sql: "select * from t where a=1;", planRegexp: ".*Point_Get.*"},
Expand All @@ -1257,6 +1260,38 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
})
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
checkFn := func(sql, planRegexp string) {
c.Assert(timeoutCtx.Err(), IsNil)
commentf := Commentf("sql: %v", sql)
stats := collector.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0)
// since 1 sql may has many plan, check `len(stats) > 0` instead of `len(stats) == 1`.
c.Assert(len(stats) > 0, IsTrue, commentf)

for _, s := range stats {
sqlStr := collector.GetSQL(s.SQLDigest)
encodedPlan := collector.GetPlan(s.PlanDigest)
// Normalize the user SQL before check.
normalizedSQL := parser.Normalize(sql)
c.Assert(sqlStr, Equals, normalizedSQL, commentf)
// decode plan before check.
normalizedPlan, err := plancodec.DecodeNormalizedPlan(encodedPlan)
c.Assert(err, IsNil)
// remove '\n' '\t' before do regexp match.
normalizedPlan = strings.Replace(normalizedPlan, "\n", " ", -1)
normalizedPlan = strings.Replace(normalizedPlan, "\t", " ", -1)
c.Assert(normalizedPlan, Matches, planRegexp, commentf)
}
}
// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)
// Check result of test case 1.
for _, ca := range cases1 {
checkFn(ca.sql, ca.planRegexp)
ca.cancel()
}

// Test case 2: prepare/execute sql
cases2 := []struct {
prepare string
Expand All @@ -1269,8 +1304,8 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
{prepare: "update t1 set b=a where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"},
{prepare: "delete from t1 where b = a limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"},
{prepare: "replace into t1 (b) values (?);", args: []interface{}{1}, planRegexp: ""},
{prepare: "select * from t1 use index(idx) where a<?;", args: []interface{}{10000000}, planRegexp: ".*IndexLookUp.*"},
{prepare: "select * from t1 ignore index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*TableReader.*"},
{prepare: "select * from t1 use index(idx) where a<?;", args: []interface{}{10}, planRegexp: ".*IndexLookUp.*"},
{prepare: "select * from t1 ignore index(idx) where a>?;", args: []interface{}{1000000000}, planRegexp: ".*TableReader.*"},
{prepare: "select /*+ HASH_JOIN(t1, t2) */ * from t1 t1 join t1 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*HashJoin.*"},
{prepare: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t1 t1 join t1 t2 on t2.a=t1.a where t1.b is not null;", args: nil, planRegexp: ".*IndexHashJoin.*"},
{prepare: "select * from t1 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"},
Expand All @@ -1297,6 +1332,13 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
}
})
}
// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)
// Check result of test case 2.
for _, ca := range cases2 {
checkFn(ca.prepare, ca.planRegexp)
ca.cancel()
}

// Test case 3: prepare, execute stmt using @val...
cases3 := []struct {
Expand All @@ -1309,8 +1351,8 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
{prepare: "update t2 set b=a where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"},
{prepare: "delete from t2 where b = a limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"},
{prepare: "replace into t2 (b) values (?);", args: []interface{}{1}, planRegexp: ""},
{prepare: "select * from t2 use index(idx) where a<?;", args: []interface{}{10000000}, planRegexp: ".*IndexLookUp.*"},
{prepare: "select * from t2 ignore index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*TableReader.*"},
{prepare: "select * from t2 use index(idx) where a<?;", args: []interface{}{10}, planRegexp: ".*IndexLookUp.*"},
{prepare: "select * from t2 ignore index(idx) where a>?;", args: []interface{}{1000000000}, planRegexp: ".*TableReader.*"},
{prepare: "select /*+ HASH_JOIN(t1, t2) */ * from t2 t1 join t2 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*HashJoin.*"},
{prepare: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t2 t1 join t2 t2 on t2.a=t1.a where t1.b is not null;", args: nil, planRegexp: ".*IndexHashJoin.*"},
{prepare: "select * from t2 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"},
Expand Down Expand Up @@ -1354,30 +1396,10 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {

// Wait the top sql collector to collect profile data.
collector.WaitCollectCnt(1)

timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
checkFn := func(sql, planRegexp string) {
c.Assert(timeoutCtx.Err(), IsNil)
commentf := Commentf("sql: %v", sql)
stats := collector.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0)
// since 1 sql may has many plan, check `len(stats) > 0` instead of `len(stats) == 1`.
c.Assert(len(stats) > 0, IsTrue, commentf)

for _, s := range stats {
sqlStr := collector.GetSQL(s.SQLDigest)
encodedPlan := collector.GetPlan(s.PlanDigest)
// Normalize the user SQL before check.
normalizedSQL := parser.Normalize(sql)
c.Assert(sqlStr, Equals, normalizedSQL, commentf)
// decode plan before check.
normalizedPlan, err := plancodec.DecodeNormalizedPlan(encodedPlan)
c.Assert(err, IsNil)
// remove '\n' '\t' before do regexp match.
normalizedPlan = strings.Replace(normalizedPlan, "\n", " ", -1)
normalizedPlan = strings.Replace(normalizedPlan, "\t", " ", -1)
c.Assert(normalizedPlan, Matches, planRegexp, commentf)
}
// Check result of test case 3.
for _, ca := range cases3 {
checkFn(ca.prepare, ca.planRegexp)
ca.cancel()
}

// Test case 4: transaction commit
Expand All @@ -1388,25 +1410,6 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) {
db.Exec("insert into t () values (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),()")
db.Exec("commit")
})

// Check result of test case 1.
for _, ca := range cases1 {
checkFn(ca.sql, ca.planRegexp)
ca.cancel()
}

// Check result of test case 2.
for _, ca := range cases2 {
checkFn(ca.prepare, ca.planRegexp)
ca.cancel()
}

// Check result of test case 3.
for _, ca := range cases3 {
checkFn(ca.prepare, ca.planRegexp)
ca.cancel()
}

// Check result of test case 4.
checkFn("commit", "")
}
Expand All @@ -1427,11 +1430,14 @@ func (ts *tidbTestTopSQLSuite) TestTopSQLAgent(c *C) {

c.Assert(failpoint.Enable("github.com/pingcap/tidb/util/topsql/reporter/resetTimeoutForTest", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL", `return(true)`), IsNil)
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/util/topsql/reporter/resetTimeoutForTest")
c.Assert(err, IsNil)
err = failpoint.Disable("github.com/pingcap/tidb/domain/skipLoadSysVarCacheLoop")
c.Assert(err, IsNil)
err = failpoint.Disable("github.com/pingcap/tidb/util/topsql/mockHighLoadForEachSQL")
c.Assert(err, IsNil)
}()

dbt := &DBTest{c, db}
Expand Down
36 changes: 36 additions & 0 deletions util/topsql/topsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ package topsql
import (
"context"
"runtime/pprof"
"strings"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/topsql/reporter"
"github.com/pingcap/tidb/util/topsql/tracecpu"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -66,6 +71,37 @@ func AttachSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser.
} else {
linkPlanTextWithDigest(planDigestBytes, normalizedPlan)
}
failpoint.Inject("mockHighLoadForEachSQL", func(val failpoint.Value) {
// In integration test, some SQL run very fast that Top SQL pprof profile unable to sample data of those SQL,
// So need mock some high cpu load to make sure pprof profile successfully samples the data of those SQL.
// Attention: Top SQL pprof profile unable to sample data of those SQL which run very fast, this behavior is expected.
// The integration test was just want to make sure each type of SQL will be set goroutine labels and and can be collected.
if val.(bool) {
lowerSQL := strings.ToLower(normalizedSQL)
if strings.Contains(lowerSQL, "mysql") {
failpoint.Return(ctx)
}
isDML := false
for _, prefix := range []string{"insert", "update", "delete", "load", "replace", "select", "commit"} {
if strings.HasPrefix(lowerSQL, prefix) {
isDML = true
break
}
}
if !isDML {
failpoint.Return(ctx)
}
start := time.Now()
logutil.BgLogger().Info("attach SQL info", zap.String("sql", normalizedSQL), zap.Bool("has-plan", len(normalizedPlan) > 0))
for {
if time.Since(start) > 11*time.Millisecond {
break
}
for i := 0; i < 10e5; i++ {
}
}
}
})
return ctx
}

Expand Down
5 changes: 5 additions & 0 deletions util/topsql/tracecpu/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/pingcap/parser"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql/tracecpu"
"github.com/uber-go/atomic"
"go.uber.org/zap"
)

// TopSQLCollector uses for testing.
Expand Down Expand Up @@ -64,6 +66,9 @@ func (c *TopSQLCollector) Collect(ts uint64, stats []tracecpu.SQLCPUTimeRecord)
c.sqlStatsMap[hash] = stats
}
stats.CPUTimeMs += stmt.CPUTimeMs
logutil.BgLogger().Info("mock top sql collector collected sql",
zap.String("sql", c.sqlMap[string(stmt.SQLDigest)]),
zap.Bool("has-plan", len(c.planMap[string(stmt.PlanDigest)]) > 0))
}
}

Expand Down

0 comments on commit 9e8bb50

Please sign in to comment.