Skip to content

Conversation

MichaHoffmann
Copy link
Contributor

@MichaHoffmann MichaHoffmann commented Jun 8, 2025

Making "coalesce" a logical node, allows us to move it around in the execution tree and eventually coalesce later and evaluate more of the query concurrently.
This will allow us to subsequently treat local execution almost like remote execution, so that we can compute aggregations in shards.

@MichaHoffmann MichaHoffmann force-pushed the mhoffmann/make-coalesce-a-logical-node branch 2 times, most recently from cba94b5 to c8c546f Compare June 8, 2025 14:09
Signed-off-by: Michael Hoffmann <[email protected]>
@MichaHoffmann MichaHoffmann force-pushed the mhoffmann/make-coalesce-a-logical-node branch from c8c546f to 7d35322 Compare June 8, 2025 14:47
Copy link
Contributor

@yeya24 yeya24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is epic!
Just some small questions

case *VectorSelector:
if parent != nil {
// we coalesce matrix selectors in a different branch
if _, ok := (*parent).(*MatrixSelector); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about subqueries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can parent of vectorselector be a subquery? I guess you could have last_over_time(http_request_total[10m:5s]).

Exprs []Node `json:"-"`
}

func (c *Coalesce) ReturnType() parser.ValueType { return parser.ValueTypeVector }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should do Exprs[0].ReturnType()?

@@ -20,6 +20,10 @@ type Options struct {
DecodingConcurrency int
}

func (o *Options) NumShards() int {
return max(o.DecodingConcurrency, 1)
}
Copy link
Contributor

@yeya24 yeya24 Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking this shard size could be different for different operators? I wonder if the default value works for all operators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is numCores / 2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I meant if different operators should use the same concurrency

Copy link
Contributor

@harry671003 harry671003 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!! Just one nit/question.

"github.com/thanos-io/promql-engine/query"
)

type CoalesceOptimizer struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the optimizer be called something like ConcurrentDecodeOptimizer?

Copy link
Collaborator

@fpetkovski fpetkovski Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea is to allow for things like

sum(
  coalesce(
    sum(shard_a), 
    sum(shard_b)
  )
)

down the line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a different optimizer should be responsible for sharding aggregations?

@@ -20,6 +20,10 @@ type Options struct {
DecodingConcurrency int
}

func (o *Options) NumShards() int {
return max(o.DecodingConcurrency, 1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is numCores / 2?

@fpetkovski
Copy link
Collaborator

This change potentially modifies the contract with the scanner. I would like to test it with our downstream project this week before we merge it.

v := n.VectorSelector
lm = append(v.LabelMatchers, v.Filters...)
default:
logicalplan.TraverseBottomUp(nil, &o.funcExpr.Args[0], func(parent, node *logicalplan.Node) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in the logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sector labels of the vector sector of the absent function? I think we could put it into the functioncall node and populate it in the plan yeah

@@ -219,6 +219,19 @@ func unmarshalNode(data []byte) (Node, error) {
return nil, err
}
return u, nil
case CoalesceNode:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this cause remote execution to be planned in the central node?

arg2,
opts,
logicalNode.Range,
vs.Offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could simplify this by passing in the entire vs object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants