diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 622884fe4e365..c0a05f0452ead 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -842,11 +842,11 @@ func (tcc *TxnCompilerContext) GetPrimaryKeyDef(dbName string, tableName string, } func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snapshot) (*pb.StatsInfo, error) { - stats := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) + statser := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) start := time.Now() defer func() { v2.TxnStatementStatsDurationHistogram.Observe(time.Since(start).Seconds()) - stats.AddBuildPlanStatsConsumption(time.Since(start)) + statser.AddBuildPlanStatsConsumption(time.Since(start)) }() dbName := obj.GetSchemaName() @@ -901,47 +901,26 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) statsInfo = plan2.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := tcc.getRelation(dbName, partitionTable, sub, snapshot) if err != nil { return cached, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return cached, err } statsInfo.Merge(parStats) } - - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) - } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return cached, err } - - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) } if statsInfo != nil { @@ -958,6 +937,12 @@ func (tcc *TxnCompilerContext) UpdateStatsInCache(tid uint64, s *pb.StatsInfo) { // statsInCache get the *pb.StatsInfo from session cache. If the info is nil, just return nil and false, // else, check if the info needs to be updated. func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, table engine.Relation, snapshot *plan2.Snapshot) (*pb.StatsInfo, bool) { + statser := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) + start := time.Now() + defer func() { + statser.AddStatsStatsInCacheDuration(time.Since(start)) + }() + s := tcc.GetStatsCache().GetStatsInfo(table.GetTableID(ctx), true) if s == nil { return nil, false diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 1c7e3c566f1ad..eb38d9cc43cf2 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -2013,10 +2013,13 @@ func buildPlan(reqCtx context.Context, ses FeSession, ctx plan2.CompilerContext, v2.TxnStatementBuildPlanDurationHistogram.Observe(cost.Seconds()) }() - stats := statistic.StatsInfoFromContext(reqCtx) + // NOTE: The context used by buildPlan comes from the CompilerContext object + planContext := ctx.GetContext() + stats := statistic.StatsInfoFromContext(planContext) stats.PlanStart() crs := new(perfcounter.CounterSet) - reqCtx = perfcounter.AttachBuildPlanMarkKey(reqCtx, crs) + planContext = perfcounter.AttachBuildPlanMarkKey(planContext, crs) + ctx.SetContext(planContext) defer func() { stats.AddBuildPlanS3Request(statistic.S3Request{ List: crs.FileService.S3.List.Load(), @@ -3756,12 +3759,14 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt int64(statsInfo.PlanStage.PlanDuration) + int64(statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + - statsInfo.PrepareRunStage.CompilePreRunOnceDuration - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) if totalTime < 0 { if !h.isInternalSubStmt { - ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d) = %d]", + ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d) = %d]", uuid.UUID(h.stmt.StatementID).String(), h.stmt.StatementType, statsInfo.ParseStage.ParseDuration, @@ -3770,6 +3775,7 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt operatorTimeConsumed, statsInfo.PrepareRunStage.ScopePrepareDuration+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption, totalTime, diff --git a/pkg/pb/statsinfo/statsinfo.go b/pkg/pb/statsinfo/statsinfo.go index 1967584ed4517..f336c50fb8d07 100644 --- a/pkg/pb/statsinfo/statsinfo.go +++ b/pkg/pb/statsinfo/statsinfo.go @@ -15,11 +15,20 @@ package statsinfo import ( + "context" "math" "golang.org/x/exp/constraints" ) +// StatsInfoKeyWithContext associates a statistics key with a context. +// This struct is used to tie a statistics key (Key) with the context (Ctx) during an operation, +// allowing context-related actions and management while handling statistics information. +type StatsInfoKeyWithContext struct { + Ctx context.Context + Key StatsInfoKey +} + func (sc *StatsInfo) NeedUpdate(currentApproxObjNum int64) bool { if sc.ApproxObjectNumber == 0 || sc.AccurateObjectNumber == 0 { return true diff --git a/pkg/perfcounter/context.go b/pkg/perfcounter/context.go index cccb5e0e49318..15d8c1e864d58 100644 --- a/pkg/perfcounter/context.go +++ b/pkg/perfcounter/context.go @@ -134,3 +134,22 @@ type ExecPipelineMarkKey struct{} func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Context { return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) } + +// ------------------------------------------------------------------------------------------------ +type CalcTableStatsKey struct{} + +func AttachCalcTableStatsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableStatsKey{}, true) +} + +type CalcTableSizeKey struct{} + +func AttachCalcTableSizeKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableSizeKey{}, true) +} + +type CalcTableRowsKey struct{} + +func AttachCalcTableRowsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableRowsKey{}, true) +} diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index e4696a3e0fa1f..721714902871a 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -511,25 +511,26 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option.Analyze { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -538,7 +539,18 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsInCacheDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index bca4c3b1765d2..c7f89d15d3de0 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -117,6 +118,12 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er } func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { + stats := statistic.StatsInfoFromContext(c.GetContext()) + start := time.Now() + defer func() { + stats.AddBuildPlanStatsConsumption(time.Since(start)) + }() + dbName := obj.GetSchemaName() tableName := obj.GetObjName() @@ -147,17 +154,16 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* } } var statsInfo *pb.StatsInfo - stats := statistic.StatsInfoFromContext(ctx) // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) statsInfo = plan.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := c.getRelation(dbName, partitionTable, snapshot) if err != nil { return nil, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return nil, err @@ -165,31 +171,12 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* statsInfo.Merge(parStats) } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return nil, err } - - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) } return statsInfo, nil } diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 9384f79191a24..4ae57b193ff28 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -88,25 +88,26 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option == AnalyzeOption { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -115,7 +116,18 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsInCacheDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") diff --git a/pkg/sql/plan/function/func_mo_explain_phy_test.go b/pkg/sql/plan/function/func_mo_explain_phy_test.go index 11dea2fb56550..9dc1b06d053a7 100644 --- a/pkg/sql/plan/function/func_mo_explain_phy_test.go +++ b/pkg/sql/plan/function/func_mo_explain_phy_test.go @@ -921,7 +921,7 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 CPU Usage: - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-IOAccess(214917833)-IOMerge(0) + - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) Physical Plan Deployment: LOCAL SCOPES: Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) @@ -978,11 +978,15 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 CPU Usage: - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-IOAccess(214917833)-IOMerge(0) + - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) Query Build Plan Stage: - CPU Time: 649910ns - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 + - Build Plan Duration: 649910ns - Call Stats Duration: 3457758ns + - Call StatsInCache Duration: 0ns + - Call Stats IO Consumption: 0ns + - Call Stats S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 Query Compile Stage: - CPU Time: 299370ns - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 2cfd8e2f3d7c6..39ad628a5c747 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -290,13 +290,15 @@ type StatsInfo struct { // Planning Phase Statistics PlanStage struct { - PlanDuration time.Duration `json:"PlanDuration"` - PlanStartTime time.Time `json:"PlanStartTime"` - BuildPlanS3Request S3Request `json:"BuildPlanS3Request"` - BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` + PlanDuration time.Duration `json:"PlanDuration"` + PlanStartTime time.Time `json:"PlanStartTime"` + BuildPlanS3Request S3Request `json:"BuildPlanS3Request"` + BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. - BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns - BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns + BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` + BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns + BuildPlanStatsInCacheDuration int64 `json:"BuildPlanStatsInCacheDuration"` // unit: ns + BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } // Compile phase statistics @@ -514,6 +516,20 @@ func (stats *StatsInfo) AddBuildPlanStatsConsumption(d time.Duration) { atomic.AddInt64(&stats.PlanStage.BuildPlanStatsDuration, int64(d)) } +func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d)) +} + +func (stats *StatsInfo) AddStatsStatsInCacheDuration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.BuildPlanStatsInCacheDuration, int64(d)) +} + func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { if stats == nil { return diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index bba9ec494928c..d26164e783cb7 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -31,9 +31,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/query" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/queryservice/client" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) @@ -142,7 +144,7 @@ func WithUpdateWorkerFactor(f int) GlobalStatsOption { } // WithStatsUpdater set the update function to update stats info. -func WithStatsUpdater(f func(pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { +func WithStatsUpdater(f func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { return func(s *GlobalStats) { s.statsUpdater = f } @@ -174,7 +176,7 @@ type GlobalStats struct { // TODO(volgariver6): add metrics of the chan length. tailC chan *logtail.TableLogtail - updateC chan pb.StatsInfoKey + updateC chan pb.StatsInfoKeyWithContext updatingMu struct { sync.Mutex @@ -214,7 +216,7 @@ type GlobalStats struct { // statsUpdate is the function which updates the stats info. // If it is nil, set it to doUpdate. - statsUpdater func(pb.StatsInfoKey, *pb.StatsInfo) bool + statsUpdater func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool // for test only currently. approxObjectNumUpdater func() int64 } @@ -226,7 +228,7 @@ func NewGlobalStats( ctx: ctx, engine: e, tailC: make(chan *logtail.TableLogtail, 10000), - updateC: make(chan pb.StatsInfoKey, 3000), + updateC: make(chan pb.StatsInfoKeyWithContext, 3000), logtailUpdate: newLogtailUpdate(), tableLogtailCounter: make(map[pb.StatsInfoKey]int64), KeyRouter: keyRouter, @@ -268,17 +270,35 @@ func (gs *GlobalStats) checkTriggerCond(key pb.StatsInfoKey, entryNum int64) boo } func (gs *GlobalStats) PrefetchTableMeta(ctx context.Context, key pb.StatsInfoKey) bool { - return gs.triggerUpdate(key, false) + wrapkey := pb.StatsInfoKeyWithContext{ + Ctx: ctx, + Key: key, + } + return gs.triggerUpdate(wrapkey, false) } func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo { gs.mu.Lock() defer gs.mu.Unlock() + + wrapkey := pb.StatsInfoKeyWithContext{ + Ctx: ctx, + Key: key, + } + info, ok := gs.mu.statsInfoMap[key] if ok && info != nil { return info } + if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok { + stats := statistic.StatsInfoFromContext(ctx) + start := time.Now() + defer func() { + stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + }() + } + // Get stats info from remote node. if gs.KeyRouter != nil { client := gs.engine.qc @@ -314,7 +334,7 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) // If the trigger condition is not satisfied, the stats will not be updated // for long time. So we trigger the update here to get the stats info as soon // as possible. - gs.triggerUpdate(key, true) + gs.triggerUpdate(wrapkey, true) }() info, ok = gs.mu.statsInfoMap[key] @@ -375,7 +395,7 @@ func (gs *GlobalStats) consumeWorker(ctx context.Context) { return case tail := <-gs.tailC: - gs.consumeLogtail(tail) + gs.consumeLogtail(ctx, tail) } } } @@ -396,7 +416,7 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { } } -func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { +func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyWithContext, force bool) bool { if force { gs.updateC <- key v2.StatsTriggerForcedCounter.Add(1) @@ -412,15 +432,21 @@ func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { } } -func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { +func (gs *GlobalStats) consumeLogtail(ctx context.Context, tail *logtail.TableLogtail) { key := pb.StatsInfoKey{ AccId: tail.Table.AccId, DatabaseID: tail.Table.DbId, TableID: tail.Table.TbId, } + + wrapkey := pb.StatsInfoKeyWithContext{ + Ctx: ctx, + Key: key, + } + if len(tail.CkpLocation) > 0 { if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } else if tail.Table != nil { var triggered bool @@ -428,7 +454,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if logtailreplay.IsMetaEntry(cmd.TableName) { triggered = true if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } break } @@ -441,7 +467,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if !triggered && gs.checkTriggerCond(key, gs.tableLogtailCounter[key]) { gs.tableLogtailCounter[key] = 0 if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } } @@ -600,38 +626,51 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) { }) } -func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { - if !gs.shouldUpdate(key) { +func (gs *GlobalStats) updateTableStats(warpKey pb.StatsInfoKeyWithContext) { + statser := statistic.StatsInfoFromContext(warpKey.Ctx) + crs := new(perfcounter.CounterSet) + + if !gs.shouldUpdate(warpKey.Key) { return } // wait until the table's logtail has been updated. - gs.waitLogtailUpdated(key.TableID) + gs.waitLogtailUpdated(warpKey.Key.TableID) // updated is used to mark that the stats info is updated. var updated bool stats := plan2.NewStatsInfo() + + newCtx := perfcounter.AttachS3RequestKey(warpKey.Ctx, crs) if gs.statsUpdater != nil { - updated = gs.statsUpdater(key, stats) - } + updated = gs.statsUpdater(newCtx, warpKey.Key, stats) + } + statser.AddBuildPlanStatsS3Request(statistic.S3Request{ + List: crs.FileService.S3.List.Load(), + Head: crs.FileService.S3.Head.Load(), + Put: crs.FileService.S3.Put.Load(), + Get: crs.FileService.S3.Get.Load(), + Delete: crs.FileService.S3.Delete.Load(), + DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + }) gs.mu.Lock() defer gs.mu.Unlock() if updated { - gs.mu.statsInfoMap[key] = stats - gs.broadcastStats(key) - } else if _, ok := gs.mu.statsInfoMap[key]; !ok { - gs.mu.statsInfoMap[key] = nil + gs.mu.statsInfoMap[warpKey.Key] = stats + gs.broadcastStats(warpKey.Key) + } else if _, ok := gs.mu.statsInfoMap[warpKey.Key]; !ok { + gs.mu.statsInfoMap[warpKey.Key] = nil } // Notify all the waiters to read the new stats info. gs.mu.cond.Broadcast() - gs.doneUpdate(key, updated) + gs.doneUpdate(warpKey.Key, updated) } -func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { +func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { table := gs.engine.GetLatestCatalogCache().GetTableById(key.AccId, key.DatabaseID, key.TableID) // table or its definition is nil, means that the table is created but not committed yet. if table == nil || table.TableDef == nil { @@ -656,7 +695,7 @@ func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { approxObjectNum, stats, ) - if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil { + if err := UpdateStats(ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) return false } diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index 48d89fad6b2cc..354e0c12e6724 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -23,6 +23,8 @@ import ( "time" "github.com/lni/goutils/leaktest" + "github.com/stretchr/testify/assert" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -32,7 +34,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" - "github.com/stretchr/testify/assert" ) func TestGetStats(t *testing.T) { @@ -41,7 +42,7 @@ func TestGetStats(t *testing.T) { defer cancel() gs := NewGlobalStats(ctx, nil, nil, WithUpdateWorkerFactor(4), - WithStatsUpdater(func(key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { + WithStatsUpdater(func(_ context.Context, key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { info.BlockNumber = 20 return true }), @@ -161,7 +162,7 @@ func TestUpdateStats(t *testing.T) { TableID: 1001, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -180,7 +181,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -199,7 +200,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.True(t, updated) }, WithApproxObjectNumUpdater(func() int64 { return 10