Skip to content

Commit 7b0afb8

Browse files
authored
Merge pull request #572 from streamingfast/feature/max-tier1-requests
Added max tier1 request
2 parents 66d9b28 + 3eb4d86 commit 7b0afb8

File tree

8 files changed

+251
-42
lines changed

8 files changed

+251
-42
lines changed

app/tier1.go

+26-12
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ import (
77
"time"
88

99
"connectrpc.com/connect"
10-
"github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect"
11-
ssconnect "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect"
12-
"github.com/streamingfast/substreams/reqctx"
13-
"github.com/streamingfast/substreams/wasm/wazero"
14-
1510
"github.com/streamingfast/bstream"
1611
"github.com/streamingfast/bstream/blockstream"
1712
"github.com/streamingfast/bstream/hub"
@@ -23,8 +18,12 @@ import (
2318
"github.com/streamingfast/shutter"
2419
"github.com/streamingfast/substreams/client"
2520
"github.com/streamingfast/substreams/metrics"
21+
"github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect"
22+
ssconnect "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect"
23+
"github.com/streamingfast/substreams/reqctx"
2624
"github.com/streamingfast/substreams/service"
2725
"github.com/streamingfast/substreams/wasm"
26+
"github.com/streamingfast/substreams/wasm/wazero"
2827
"go.uber.org/atomic"
2928
"go.uber.org/zap"
3029
)
@@ -56,11 +55,13 @@ type Tier1Config struct {
5655
BlockExecutionTimeout time.Duration
5756
TmpDir string
5857

59-
StateStoreURL string
60-
StateStoreDefaultTag string
61-
BlockType string
62-
StateBundleSize uint64
63-
EnforceCompression bool // refuse incoming requests that do not accept gzip compression (ConnectRPC or GRPC)
58+
StateStoreURL string
59+
StateStoreDefaultTag string
60+
BlockType string
61+
StateBundleSize uint64
62+
EnforceCompression bool // refuse incoming requests that do not accept gzip compression (ConnectRPC or GRPC)
63+
ActiveRequestsSoftLimit int // maximum number of active requests a tier1 app can have with external clients before starting to advertise itself as unready in the health check
64+
ActiveRequestsHardLimit int // maximum number of active requests a tier1 app can have with external clients, refuse with CodeUnavailable if reached
6465

6566
MaxSubrequests uint64
6667
SubrequestsEndpoint string
@@ -199,9 +200,12 @@ func (a *Tier1App) Run() error {
199200
a.config.MaxSubrequests,
200201
a.config.StateBundleSize,
201202
a.config.BlockType,
203+
a.setIsReady,
202204
subrequestsClientConfig,
203205
tier2RequestParameters,
204206
a.config.EnforceCompression,
207+
a.config.ActiveRequestsSoftLimit,
208+
a.config.ActiveRequestsHardLimit,
205209
opts...,
206210
)
207211
if err != nil {
@@ -230,14 +234,14 @@ func (a *Tier1App) Run() error {
230234
a.logger.Info("waiting until hub is real-time synced")
231235
select {
232236
case <-forkableHub.Ready:
233-
metrics.AppReadinessTier1.SetReady()
237+
// Wait until the hub is ready
234238
case <-a.Terminating():
235239
return
236240
}
237241
}
238242

239243
a.logger.Info("launching gRPC server", zap.Bool("live_support", withLive))
240-
a.isReady.CompareAndSwap(false, true)
244+
a.setIsReady(true)
241245

242246
err := service.ListenTier1(a.config.GRPCListenAddr, svc, infoServer, a.modules.Authenticator, a.logger, a.HealthCheck)
243247
a.Shutdown(err)
@@ -267,6 +271,16 @@ func (a *Tier1App) IsReady(ctx context.Context) bool {
267271
return a.isReady.Load()
268272
}
269273

274+
func (a *Tier1App) setIsReady(ready bool) {
275+
if ready {
276+
a.isReady.Store(true)
277+
metrics.AppReadinessTier1.SetReady()
278+
} else {
279+
a.isReady.Store(false)
280+
metrics.AppReadinessTier1.SetNotReady()
281+
}
282+
}
283+
270284
// Validate inspects itself to determine if the current config is valid according to
271285
// substreams rules.
272286
func (config *Tier1Config) Validate() error {

app/tier2.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (a *Tier2App) Run() error {
8080
opts = append(opts, service.WithBlockExecutionTimeout(a.config.BlockExecutionTimeout))
8181
}
8282

83-
opts = append(opts, service.WithReadinessFunc(a.setReadiness))
83+
opts = append(opts, service.WithReadinessFunc(a.setIsReady))
8484

8585
if a.config.TmpDir != "" {
8686
wazero.SetTempDir(a.config.TmpDir)
@@ -103,12 +103,11 @@ func (a *Tier2App) Run() error {
103103
return fmt.Errorf("failed to setup trust authenticator: %w", err)
104104
}
105105

106-
a.OnTerminating(func(_ error) { metrics.AppReadinessTier2.SetNotReady() })
106+
a.OnTerminating(func(_ error) { a.setIsReady(false) })
107107

108108
go func() {
109109
a.logger.Info("launching gRPC server")
110-
a.isReady.CompareAndSwap(false, true)
111-
metrics.AppReadinessTier2.SetReady()
110+
a.setIsReady(false)
112111

113112
err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck)
114113
a.Shutdown(err)
@@ -135,8 +134,14 @@ func (a *Tier2App) IsReady(ctx context.Context) bool {
135134
return a.isReady.Load()
136135
}
137136

138-
func (a *Tier2App) setReadiness(ready bool) {
137+
func (a *Tier2App) setIsReady(ready bool) {
139138
a.isReady.Store(ready)
139+
140+
if ready {
141+
metrics.AppReadinessTier2.SetReady()
142+
} else {
143+
metrics.AppReadinessTier2.SetNotReady()
144+
}
140145
}
141146

142147
// Validate inspects itself to determine if the current config is valid according to

docs/release-notes/change-log.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
* Fix detection of accepted gzip compression when multiple values are sent in the `Grpc-Accept-Encoding` header (ex: Python library)
1818
* Properly accept and compress responses with `gzip` for browser HTTP clients using ConnectWeb with `Accept-Encoding` header
1919
* Allow setting subscription channel max capacity via `SOURCE_CHAN_SIZE` env var (default: 100)
20+
* Added tier1 app configuration option to limit max active requests a single instance can accept before starting to reject them with 'Unavailable' gRPC code.
2021

2122
### Client-side
2223

metrics/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
var MetricSet = dmetrics.NewSet()
1111

12-
var ActiveSubstreams = MetricSet.NewGauge("substreams_active_requests", "Number of active Substreams requests")
12+
var ActiveRequests = MetricSet.NewGauge("substreams_active_requests", "Number of active Substreams requests")
1313
var SubstreamsCounter = MetricSet.NewCounter("substreams_counter", "Substreams requests count")
1414

1515
var BlockBeginProcess = MetricSet.NewCounter("substreams_block_process_start_counter", "Counter for total block processes started, used for rate")

service/options.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func WithReadinessFunc(f func(bool)) Option {
6767
case *Tier1Service:
6868
// not used
6969
case *Tier2Service:
70-
s.setReadyFunc = f
70+
s.appSetIsReadyState = f
7171
}
7272
}
7373
}

service/tier1.go

+101-21
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,22 @@ import (
1313
"sync"
1414
"time"
1515

16-
"github.com/streamingfast/substreams/metering"
17-
16+
"connectrpc.com/connect"
1817
"github.com/streamingfast/bstream"
19-
"github.com/streamingfast/dgrpc"
20-
2118
"github.com/streamingfast/bstream/hub"
2219
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
2320
bsstream "github.com/streamingfast/bstream/stream"
2421
"github.com/streamingfast/dauth"
22+
"github.com/streamingfast/dgrpc"
2523
"github.com/streamingfast/dmetering"
24+
"github.com/streamingfast/dmetrics"
2625
"github.com/streamingfast/dstore"
2726
"github.com/streamingfast/logging"
2827
tracing "github.com/streamingfast/sf-tracing"
2928
"github.com/streamingfast/shutter"
30-
31-
"connectrpc.com/connect"
3229
"github.com/streamingfast/substreams"
3330
"github.com/streamingfast/substreams/client"
31+
"github.com/streamingfast/substreams/metering"
3432
"github.com/streamingfast/substreams/metrics"
3533
"github.com/streamingfast/substreams/orchestrator/plan"
3634
"github.com/streamingfast/substreams/orchestrator/work"
@@ -69,12 +67,17 @@ type Tier1Service struct {
6967
tracer ttrace.Tracer
7068
logger *zap.Logger
7169

70+
// You can call this function to switch the parent app to be ready or not ready influencing the health check,
71+
// it's provided by [app.Tier1App] and tied to the health check endpoint.
72+
appSetIsReadyState func(isReady bool)
7273
getRecentFinalBlock func() (uint64, error)
7374
resolveCursor pipeline.CursorResolver
7475
getHeadBlock func() (uint64, error)
7576

76-
enforceCompression bool
77-
tier2RequestParameters reqctx.Tier2RequestParameters
77+
enforceCompression bool
78+
activeRequestsSoftLimit int
79+
activeRequestsHardLimit int
80+
tier2RequestParameters reqctx.Tier2RequestParameters
7881
}
7982

8083
func getBlockTypeFromStreamFactory(sf *StreamFactory) (string, error) {
@@ -129,9 +132,12 @@ func NewTier1(
129132
stateBundleSize uint64,
130133
blockType string,
131134

135+
appSetIsReadyState func(isReady bool),
132136
substreamsClientConfig *client.SubstreamsClientConfig,
133137
tier2RequestParameters reqctx.Tier2RequestParameters,
134138
enforceCompression bool,
139+
activeRequestsSoftLimit int,
140+
activeRequestsHardLimit int,
135141
opts ...Option,
136142
) (*Tier1Service, error) {
137143

@@ -168,16 +174,19 @@ func NewTier1(
168174

169175
logger.Info("launching tier1 service", zap.Reflect("client_config", substreamsClientConfig), zap.String("block_type", blockType), zap.Bool("with_live", hub != nil))
170176
s := &Tier1Service{
171-
Shutter: shutter.New(),
172-
runtimeConfig: runtimeConfig,
173-
blockType: blockType,
174-
tracer: tracing.GetTracer(),
175-
failedRequests: make(map[string]*recordedFailure),
176-
resolveCursor: pipeline.NewCursorResolver(hub, mergedBlocksStore, forkedBlocksStore),
177-
logger: logger,
178-
tier2RequestParameters: tier2RequestParameters,
179-
blockExecutionTimeout: 3 * time.Minute,
180-
enforceCompression: enforceCompression,
177+
Shutter: shutter.New(),
178+
runtimeConfig: runtimeConfig,
179+
blockType: blockType,
180+
tracer: tracing.GetTracer(),
181+
failedRequests: make(map[string]*recordedFailure),
182+
resolveCursor: pipeline.NewCursorResolver(hub, mergedBlocksStore, forkedBlocksStore),
183+
logger: logger,
184+
appSetIsReadyState: appSetIsReadyState,
185+
tier2RequestParameters: tier2RequestParameters,
186+
blockExecutionTimeout: 3 * time.Minute,
187+
enforceCompression: enforceCompression,
188+
activeRequestsSoftLimit: activeRequestsSoftLimit,
189+
activeRequestsHardLimit: activeRequestsHardLimit,
181190
}
182191

183192
s.streamFactoryFunc = sf.New
@@ -218,7 +227,7 @@ func (s *Tier1Service) Blocks(
218227
compressed = true
219228
}
220229
if s.enforceCompression && !compressed {
221-
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Your client does not accept gzip- or zstd-compressed streams. Check how to enable it on your GRPC or ConnectRPC client"))
230+
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("your client does not accept gzip- or zstd-compressed streams. Check how to enable it on your gRPC or ConnectRPC client"))
222231
}
223232

224233
request := req.Msg
@@ -284,10 +293,28 @@ func (s *Tier1Service) Blocks(
284293
}
285294
}
286295

296+
status := s.getOverloadedStatus()
297+
298+
// Set us as unready if the soft limit would be reached by this request
299+
if status.softLimitWouldBeReached() {
300+
s.appSetIsReadyState(false)
301+
}
302+
303+
// Refuse the request if the hard limit is currently reached by this instance
304+
if status.hardLimitReached() {
305+
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service is currently not accepting new requests, re-connect back right away to be balanced to a non-full node"))
306+
}
307+
287308
logger.Info("incoming Substreams Blocks request", fields...)
288309
metrics.SubstreamsCounter.Inc()
289-
metrics.ActiveSubstreams.Inc()
290-
defer metrics.ActiveSubstreams.Dec()
310+
metrics.ActiveRequests.Inc()
311+
defer func() {
312+
metrics.ActiveRequests.Dec()
313+
314+
if status := s.getOverloadedStatus(); status.canAcceptUpcomingRequests() {
315+
s.appSetIsReadyState(true)
316+
}
317+
}()
291318

292319
requestID := fmt.Sprintf("%s:%d:%d:%s:%t:%t:%s",
293320
outputModuleHash,
@@ -737,3 +764,56 @@ func matchHeader(header http.Header) bool {
737764
}
738765
return false
739766
}
767+
768+
type overloadingStatus struct {
769+
// set only if either soft or hard limit set is > 0
770+
activeRequestCount int
771+
softLimit int
772+
hardLimit int
773+
}
774+
775+
// softLimitWouldBeReached returns true if the soft limit would be reached if one more request was added.
776+
func (s *overloadingStatus) softLimitWouldBeReached() bool {
777+
return s.softLimit > 0 && s.activeRequestCount+1 >= s.softLimit
778+
}
779+
780+
// hardLimitReached returns true if the hard limit is actually reached from the active request count.
781+
func (s *overloadingStatus) hardLimitReached() bool {
782+
return s.hardLimit > 0 && s.activeRequestCount >= s.hardLimit
783+
}
784+
785+
// canAcceptUpcomingRequests returns true if the service can accept upcoming new requests.
786+
func (s *overloadingStatus) canAcceptUpcomingRequests() bool {
787+
if s.softLimit <= 0 && s.hardLimit <= 0 {
788+
return true
789+
}
790+
791+
if s.softLimit > 0 && s.activeRequestCount >= s.softLimit {
792+
return false
793+
}
794+
795+
if s.hardLimit > 0 && s.activeRequestCount >= s.hardLimit {
796+
return false
797+
}
798+
799+
return true
800+
}
801+
802+
func (s *Tier1Service) getOverloadedStatus() (status overloadingStatus) {
803+
// Never overloaded if both soft & hard limit are 0, -1 or anything less
804+
if s.activeRequestsSoftLimit <= 0 && s.activeRequestsHardLimit <= 0 {
805+
return
806+
}
807+
808+
activeRequestCount := s.getActiveRequestCount()
809+
810+
return overloadingStatus{
811+
activeRequestCount: activeRequestCount,
812+
softLimit: s.activeRequestsSoftLimit,
813+
hardLimit: s.activeRequestsHardLimit,
814+
}
815+
}
816+
817+
func (s *Tier1Service) getActiveRequestCount() int {
818+
return int(dmetrics.NewValueFromMetric(metrics.ActiveRequests, "requests").ValueUint())
819+
}

0 commit comments

Comments
 (0)