Skip to content

Commit

Permalink
Limit parallel execution of a stage's layer
Browse files Browse the repository at this point in the history
Previously, the engine was executing modules in a stage's layer all in parallel. So if you had 20 independent mapper modules, they were all run in parallel.

This was hindering performance on high load where a lot of CPU cycles can be consumed will the machine has limited physical cores available.

We now change that behavior, development mode will not execute any modules in parallel, never. For production mode, we now limit to 2 parallel execution. A future update will make that value dynamic based on the subscription of the request.
  • Loading branch information
maoueh committed Jan 30, 2025
1 parent 645677f commit 2fcfb47
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 52 deletions.
12 changes: 10 additions & 2 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Limit parallel execution of a stage's layer.

Previously, the engine was executing modules in a stage's layer all in parallel. We now change that behavior, development mode will from now on execute every sequentially and when in production mode will limit parallelism to 2 (hard-coded) for now.

The auth plugin can control that value dynamically by providing a trusted header `X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count`.

## v1.12.4

Expand All @@ -20,8 +27,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Server

* Improve noop-mode: will now only send one signal per bundle, without any data
* Improve logging
* Improve noop-mode: will now only send one signal per bundle, without any data.

* Improve logging.

### Client

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef
Expand Down Expand Up @@ -194,7 +194,7 @@ require (
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sync v0.8.0
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320 h1:2XKZH4m
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338 h1:o3Imquu+RhIdF62OSr/ZxVPsn6jpKHwBV/Upl6P28o0=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338/go.mod h1:cwfI5vaMd+CiwZIL0H0JdP5UDWCZOVFz/ex3L0+o/j4=
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf h1:RFkRnIRUk51fcPn6eWuuecUSYFqmCP5affIMKywPIDU=
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf/go.mod h1:d8NTrjIoiqplrZEYOUXwwFaeB2C8fVeHYLvaJFfa9Xo=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c h1:6WjE2yInE+5jnI7cmCcxOiGZiEs2FQm9Zsg2a9Ivp0Q=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c/go.mod h1:dbfiy9ORrL8c6ldSq+L0H9pg8TOqqu/FsghsgUEWK54=
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLesosMjfwWsEL2i/40mFSkzenEb3M0qTyM=
Expand Down
21 changes: 12 additions & 9 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ type Pipeline struct {
postBlockHooks []substreams.BlockHook
postJobHooks []substreams.PostJobHook

wasmRuntime *wasm.Registry
execGraph *exec.Graph
loadedModules map[uint32]wasm.Module
ModuleExecutors [][]exec.ModuleExecutor // Staged module executors
executionStages exec.ExecutionStages
wasmRuntime *wasm.Registry
execGraph *exec.Graph
loadedModules map[uint32]wasm.Module
// StagedModuleExecutors represents all the modules within a stage that should be executed. The
// first level of the 2D list represents layer within a stage to execute sequentially.
// The second level contains modules to execute within a layer, those can be executed concurrently.
StagedModuleExecutors [][]exec.ModuleExecutor
executionStages exec.ExecutionStages

mapModuleOutput *pbsubstreamsrpc.MapModuleOutput
extraMapModuleOutputs []*pbsubstreamsrpc.MapModuleOutput
Expand Down Expand Up @@ -455,7 +458,7 @@ func (p *Pipeline) returnInternalModuleProgressOutputs(clock *pbsubstreams.Clock

// BuildModuleExecutors builds the ModuleExecutors, and the loadedModules.
func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
if p.ModuleExecutors != nil {
if p.StagedModuleExecutors != nil {
// Eventually, we can invalidate our catch to accomodate the PATCH
// and rebuild all the modules, and tear down the previously loaded ones.
return nil
Expand Down Expand Up @@ -582,13 +585,13 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
}
}

p.ModuleExecutors = stagedModuleExecutors
p.StagedModuleExecutors = stagedModuleExecutors
return nil
}

func (p *Pipeline) cleanUpModuleExecutors(ctx context.Context) error {
for _, stage := range p.ModuleExecutors {
for _, executor := range stage {
for _, layer := range p.StagedModuleExecutors {
for _, executor := range layer {
if err := executor.Close(ctx); err != nil {
return fmt.Errorf("closing module executor %q: %w", executor.Name(), err)
}
Expand Down
46 changes: 26 additions & 20 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"io"
"runtime/debug"
"sync"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams/metering"
"github.com/streamingfast/substreams/metrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
Expand All @@ -20,6 +20,7 @@ import (
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/execout"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -307,11 +308,14 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
// the ctx is cached in the built moduleExecutors so we only activate timeout here
ctx, cancel := context.WithTimeout(ctx, p.executionTimeout)
defer cancel()
for _, stage := range p.ModuleExecutors {
//t0 := time.Now()
if len(stage) < 2 {
//fmt.Println("Linear stage", len(stage))
for _, executor := range stage {

isDevelopmentMode := reqctx.IsInDevelopmentMode(ctx)
maxParallelExecutor := reqctx.MaxStageLayerParallelExecutor(ctx)
logging.Logger(ctx, p.stores.logger).Debug("executing stage's layers", zap.Int("layer_count", len(p.StagedModuleExecutors)), zap.Uint64("max_parallel_executor", maxParallelExecutor))

for _, layer := range p.StagedModuleExecutors {
if isDevelopmentMode || maxParallelExecutor <= 1 || len(layer) <= 1 {
for _, executor := range layer {
if !executor.RunsOnBlock(blockNum) {
continue
}
Expand All @@ -321,40 +325,42 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
}
}
} else {
results := make([]resultObj, len(stage))
wg := sync.WaitGroup{}
//fmt.Println("Parallelized in stage", stageIdx, len(stage))
for i, executor := range stage {
results := make([]resultObj, len(layer))
wg := errgroup.Group{}
wg.SetLimit(int(maxParallelExecutor))

for i, executor := range layer {
if !executor.RunsOnBlock(execOutput.Clock().Number) {
results[i] = resultObj{not_runnable: true}
continue
}
wg.Add(1)
i := i
executor := executor
go func() {
defer wg.Done()

wg.Go(func() error {
res := p.execute(ctx, executor, execOutput)
results[i] = res
}()

return nil
})
}

if err := wg.Wait(); err != nil {
return fmt.Errorf("running executors: %w", err)
}
wg.Wait()

for i, result := range results {
if result.not_runnable {
continue
}
executor := stage[i]
executor := layer[i]
if result.err != nil {
//p.returnFailureProgress(ctx, err, executor)
return fmt.Errorf("running executor %q: %w", executor.Name(), result.err)
}

if err := p.applyExecutionResult(ctx, executor, result, execOutput); err != nil {
return fmt.Errorf("applying executor results %q on block %d (%s): %w", executor.Name(), blockNum, execOutput.Clock(), result.err)
}
}
}
//blockDuration += time.Since(t0)
}

return nil
Expand Down
13 changes: 9 additions & 4 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pipeline

import (
"connectrpc.com/connect"
"context"
"errors"
"fmt"
"sync/atomic"

"connectrpc.com/connect"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
Expand All @@ -15,7 +17,6 @@ import (
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/reqctx"
"go.uber.org/zap"
"sync/atomic"
)

type getBlockFunc func() (uint64, error)
Expand Down Expand Up @@ -48,7 +49,8 @@ func BuildRequestDetails(
getRecentFinalBlock getBlockFunc,
resolveCursor CursorResolver,
getHeadBlock getBlockFunc,
segmentSize uint64) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error) {
segmentSize uint64,
) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error) {
req = &reqctx.RequestDetails{
Modules: request.Modules,
OutputModule: request.OutputModule,
Expand Down Expand Up @@ -95,7 +97,7 @@ func BuildRequestDetails(
return
}

func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails) {
func BuildRequestDetailsFromSubrequest(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails) {
req = &reqctx.RequestDetails{
Modules: request.Modules,
OutputModule: request.OutputModule,
Expand All @@ -107,6 +109,9 @@ func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest
ResolvedStartBlockNum: request.StartBlock(),
UniqueID: nextUniqueID(),
}

req.SetStageLayerParallelExecutorCountFromContext(ctx)

return req
}

Expand Down
34 changes: 34 additions & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,40 @@ func WithEmitter(ctx context.Context, emitter dmetering.EventEmitter) context.Co
return context.WithValue(ctx, emitterKey, emitter)
}

func IsInDevelopmentMode(ctx context.Context) bool {
details := Details(ctx)
if details == nil {
return true
}

return !details.ProductionMode
}

const defaultMaxStageLayerParallelExecutorCount = 2

const safeguardMaxStageLayerParallelExecutorCount = 16

// MaxParallelJobs returns the maximum number of parallel executors (e.g. go routines) that can
// be executed at the same time for a particular stage's layer as configured and accepted by the
// auth plugin.
//
// If the value is not set, it will return 1.
func MaxStageLayerParallelExecutor(ctx context.Context) uint64 {
details := Details(ctx)
if details == nil {
// Always give at least 1, but there should always be a details object attached to the context
return 1
}

// If unset, provide default value which is 2 for now
if details.MaxStageLayerParallelExecutor == 0 {
return defaultMaxStageLayerParallelExecutorCount
}

// Protect in case of misconfiguration to cap at a sane system max value
return min(details.MaxStageLayerParallelExecutor, safeguardMaxStageLayerParallelExecutorCount)
}

type ISpan interface {
// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
Expand Down
28 changes: 23 additions & 5 deletions reqctx/request.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package reqctx

import (
"context"
"strconv"

"github.com/streamingfast/dauth"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

Expand All @@ -17,11 +19,12 @@ type RequestDetails struct {
ResolvedStartBlockNum uint64
ResolvedCursor string

LinearHandoffBlockNum uint64
LinearGateBlockNum uint64
StopBlockNum uint64
MaxParallelJobs uint64
UniqueID uint64
LinearHandoffBlockNum uint64
LinearGateBlockNum uint64
StopBlockNum uint64
MaxParallelJobs uint64
MaxStageLayerParallelExecutor uint64
UniqueID uint64

ProductionMode bool
IsTier2Request bool
Expand All @@ -44,3 +47,18 @@ func (d *RequestDetails) ShouldStreamCachedOutputs() bool {
return d.ProductionMode &&
d.ResolvedStartBlockNum < d.LinearHandoffBlockNum
}

// SetStageLayerParallelExecutorCountFromContext sets the MaxStageLayerParallelExecutor from the context
// by first retrieving the dauth trusted headers and then parsing the value from the header, if present.
func (d *RequestDetails) SetStageLayerParallelExecutorCountFromContext(ctx context.Context) {
trustedHeaders := dauth.FromContext(ctx)
if trustedHeaders == nil {
return
}

if parallelExecutors := trustedHeaders.Get("X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count"); parallelExecutors != "" {
if count, err := strconv.ParseUint(parallelExecutors, 10, 64); err == nil {
d.MaxStageLayerParallelExecutor = count
}
}
}
13 changes: 7 additions & 6 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,19 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
cacheTag := s.runtimeConfig.DefaultCacheTag
if auth := dauth.FromContext(ctx); auth != nil {
if parallelJobs := auth.Get("X-Sf-Substreams-Parallel-Jobs"); parallelJobs != "" {
if ll, err := strconv.ParseUint(parallelJobs, 10, 64); err == nil {
requestDetails.MaxParallelJobs = ll
if count, err := strconv.ParseUint(parallelJobs, 10, 64); err == nil {
requestDetails.MaxParallelJobs = count
}
}
if ct := auth.Get("X-Sf-Substreams-Cache-Tag"); ct != "" {
if IsValidCacheTag(ct) {
cacheTag = ct
if tag := auth.Get("X-Sf-Substreams-Cache-Tag"); tag != "" {
if IsValidCacheTag(tag) {
cacheTag = tag
} else {
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and undescores", ct)
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and underscores", tag)
}
}

requestDetails.SetStageLayerParallelExecutorCountFromContext(ctx)
}

traceId := tracing.GetTraceID(ctx).String()
Expand Down
4 changes: 2 additions & 2 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return stream.NewErrInvalidArg(err.Error())
}

requestDetails := pipeline.BuildRequestDetailsFromSubrequest(request)
requestDetails := pipeline.BuildRequestDetailsFromSubrequest(ctx, request)
ctx = reqctx.WithRequest(ctx, requestDetails)
if s.moduleExecutionTracing {
ctx = reqctx.WithModuleExecutionTracing(ctx)
Expand Down Expand Up @@ -376,7 +376,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P

allExecutorsExcludedByBlockIndex := true
excludable:
for _, stage := range pipe.ModuleExecutors {
for _, stage := range pipe.StagedModuleExecutors {
for _, executor := range stage {
switch executor := executor.(type) {
case *exec.MapperModuleExecutor:
Expand Down

0 comments on commit 2fcfb47

Please sign in to comment.