Skip to content

Commit 8d93819

Browse files
committed
use observedPipeline for all pipelines
1 parent 1ad9172 commit 8d93819

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

pkg/engine/internal/executor/executor.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,27 +92,27 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
9292
// which wraps the pipeline with a topk/limit without reintroducing
9393
// planning cost for thousands of scan nodes.
9494
return newLazyPipeline(func(ctx context.Context, _ []Pipeline) Pipeline {
95-
return tracePipeline("physical.DataObjScan", c.executeDataObjScan(ctx, n, nodeRegion))
95+
return newObservedPipeline(c.executeDataObjScan(ctx, n, nodeRegion))
9696
}, inputs)
9797

9898
case *physical.TopK:
99-
return tracePipeline("physical.TopK", c.executeTopK(ctx, n, inputs, nodeRegion))
99+
return newObservedPipeline(c.executeTopK(ctx, n, inputs, nodeRegion))
100100
case *physical.Limit:
101-
return tracePipeline("physical.Limit", c.executeLimit(ctx, n, inputs, nodeRegion))
101+
return newObservedPipeline(c.executeLimit(ctx, n, inputs, nodeRegion))
102102
case *physical.Filter:
103-
return tracePipeline("physical.Filter", c.executeFilter(ctx, n, inputs, nodeRegion))
103+
return newObservedPipeline(c.executeFilter(ctx, n, inputs, nodeRegion))
104104
case *physical.Projection:
105-
return tracePipeline("physical.Projection", c.executeProjection(ctx, n, inputs, nodeRegion))
105+
return newObservedPipeline(c.executeProjection(ctx, n, inputs, nodeRegion))
106106
case *physical.RangeAggregation:
107-
return tracePipeline("physical.RangeAggregation", c.executeRangeAggregation(ctx, n, inputs, nodeRegion))
107+
return newObservedPipeline(c.executeRangeAggregation(ctx, n, inputs, nodeRegion))
108108
case *physical.VectorAggregation:
109-
return tracePipeline("physical.VectorAggregation", c.executeVectorAggregation(ctx, n, inputs, nodeRegion))
109+
return newObservedPipeline(c.executeVectorAggregation(ctx, n, inputs, nodeRegion))
110110
case *physical.ColumnCompat:
111-
return tracePipeline("physical.ColumnCompat", c.executeColumnCompat(ctx, n, inputs, nodeRegion))
111+
return newObservedPipeline(c.executeColumnCompat(ctx, n, inputs, nodeRegion))
112112
case *physical.Parallelize:
113-
return tracePipeline("physical.Parallelize", c.executeParallelize(ctx, n, inputs))
113+
return c.executeParallelize(ctx, n, inputs)
114114
case *physical.ScanSet:
115-
return tracePipeline("physical.ScanSet", c.executeScanSet(ctx, n, nodeRegion))
115+
return c.executeScanSet(ctx, n, nodeRegion)
116116
default:
117117
return errorPipeline(ctx, fmt.Errorf("invalid node type: %T", node))
118118
}

0 commit comments

Comments
 (0)