Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3090cbf
add blob submission metrics
gupadhyaya Sep 12, 2025
b25c77f
fix tests
gupadhyaya Sep 12, 2025
c451259
fix build errors
gupadhyaya Sep 12, 2025
931387a
fix api
gupadhyaya Sep 12, 2025
43e6746
shorten
gupadhyaya Sep 12, 2025
b4f3dde
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 12, 2025
0487333
remove duplicates
gupadhyaya Sep 16, 2025
6be3ead
fix format issue
gupadhyaya Sep 16, 2025
17a30e2
Fix bitswap compilation issues and enable metrics in tastora framework
gupadhyaya Sep 17, 2025
6bc2316
Remove HashOnRead method from BlockstoreWithMetrics
gupadhyaya Sep 17, 2025
962f115
Update tastora framework to use local Docker image with metrics
gupadhyaya Sep 17, 2025
7198aa5
Remove interface compliance check for BlockstoreWithMetrics
gupadhyaya Sep 17, 2025
80e2a6a
fix coreaccessor with new metrics instead of nil, fx.invoke issue
gupadhyaya Sep 17, 2025
af3953a
fix blob metrics
gupadhyaya Sep 17, 2025
2af16ef
go.mod changes
gupadhyaya Sep 17, 2025
06e3513
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 17, 2025
4659fbe
go mod tidy
gupadhyaya Sep 17, 2025
871bd90
blob metrics fixes
gupadhyaya Sep 18, 2025
ccbb3a3
fix
gupadhyaya Sep 18, 2025
e00d3c7
remove unnecessary method
gupadhyaya Sep 18, 2025
e98716f
remove debug stuff
gupadhyaya Sep 18, 2025
ab8d1b2
WithMetrics
gupadhyaya Sep 18, 2025
0f2a3cf
remove passing nil metrics
gupadhyaya Sep 18, 2025
5c1a2f6
private metrics, stop method
gupadhyaya Sep 18, 2025
88ba0e0
cleanup
gupadhyaya Sep 18, 2025
88e78ba
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 18, 2025
510b48c
blob metrics changes
gupadhyaya Sep 18, 2025
0044277
remove nil checks
gupadhyaya Sep 19, 2025
ab99226
get rid of duplicate legacy state metrics
gupadhyaya Sep 19, 2025
29c9a4e
optimizing metrics recording
gupadhyaya Sep 19, 2025
1ba9c6c
error out if metrics fail
gupadhyaya Sep 19, 2025
c604d7b
named error
gupadhyaya Sep 19, 2025
74a42d1
skip resp name
gupadhyaya Sep 19, 2025
878d1bf
error as bool/enum to avoid cardinality explosion
gupadhyaya Sep 19, 2025
9356b77
golangci-lint fix
gupadhyaya Sep 19, 2025
06a3961
unify error counting
gupadhyaya Sep 19, 2025
b6f7a3d
remove magic strings
gupadhyaya Sep 22, 2025
96b085a
golangci-lint fixes
gupadhyaya Sep 23, 2025
f110c2d
further fix golangci-lint
gupadhyaya Sep 23, 2025
533f70e
fixing docker security issue
gupadhyaya Sep 23, 2025
344da58
Revert "fixing docker security issue"
gupadhyaya Sep 23, 2025
f7d2d64
Merge origin/main into blob_submission_metrics
gupadhyaya Oct 14, 2025
0de2e67
Fix compilation error after merge
gupadhyaya Oct 14, 2025
380ea9e
Apply goimports-reviser formatting to fix import organization
gupadhyaya Oct 14, 2025
5ac90dc
Fix import ordering in state/core_access.go
gupadhyaya Oct 14, 2025
b81bfc0
Fix remaining issues after merge
gupadhyaya Oct 14, 2025
4495e04
minor
gupadhyaya Oct 14, 2025
5006f04
go mod tidy
gupadhyaya Oct 14, 2025
0d3d4e4
remove unnecessary metrics
gupadhyaya Oct 21, 2025
70ad967
Merge branch 'main' into blob_submission_metrics
gupadhyaya Oct 21, 2025
c373bd9
Remove accidentally committed grafana and metrics files
gupadhyaya Oct 21, 2025
5ac205d
Fix TestListenerWithWrongChainRPC cleanup issue
gupadhyaya Oct 21, 2025
03d26e7
Merge remote-tracking branch 'origin/main' into blob_submission_metrics
gupadhyaya Oct 28, 2025
ace5b72
fix
gupadhyaya Oct 28, 2025
24a6adc
minor fix
gupadhyaya Oct 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (c *Client) initTxClient(
trustedHeadGetter{remote: c.Header},
conn,
submitCfg.Network.String(),
nil, // metrics not available in client context
)
if err != nil {
return err
Expand All @@ -108,7 +109,7 @@ func (c *Client) initTxClient(
c.State = core

// setup blob submission service using core
blobSvc := blob.NewService(core, nil, nil, nil)
blobSvc := blob.NewService(core, nil, nil, nil, nil) // metrics not available in client context
err = blobSvc.Start(ctx)
if err != nil {
return err
Expand Down
196 changes: 196 additions & 0 deletions blob/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package blob

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/fx"
)

var meter = otel.Meter("blob")

// Metrics tracks blob-related metrics
type Metrics struct {
// Retrieval metrics
retrievalCounter metric.Int64Counter
retrievalDuration metric.Float64Histogram
retrievalErrors metric.Int64Counter
retrievalNotFound metric.Int64Counter

// Proof metrics
proofCounter metric.Int64Counter
proofDuration metric.Float64Histogram
proofErrors metric.Int64Counter

// Internal counters (thread-safe)
totalRetrievals atomic.Int64
totalRetrievalErrors atomic.Int64
totalProofs atomic.Int64
totalProofErrors atomic.Int64
}

// WithMetrics registers blob metrics
func WithMetrics(lc fx.Lifecycle) (*Metrics, error) {
// Retrieval metrics
retrievalCounter, err := meter.Int64Counter(
"blob_retrieval_total",
metric.WithDescription("Total number of blob retrieval operations"),
)
if err != nil {
return nil, err
}

retrievalDuration, err := meter.Float64Histogram(
"blob_retrieval_duration_seconds",
metric.WithDescription("Duration of blob retrieval operations"),
metric.WithUnit("s"),
)
if err != nil {
return nil, err
}

retrievalErrors, err := meter.Int64Counter(
"blob_retrieval_errors_total",
metric.WithDescription("Total number of blob retrieval errors"),
)
if err != nil {
return nil, err
}

retrievalNotFound, err := meter.Int64Counter(
"blob_retrieval_not_found_total",
metric.WithDescription("Total number of blob not found errors"),
)
if err != nil {
return nil, err
}

// Proof metrics
proofCounter, err := meter.Int64Counter(
"blob_proof_total",
metric.WithDescription("Total number of blob proof operations"),
)
if err != nil {
return nil, err
}

proofDuration, err := meter.Float64Histogram(
"blob_proof_duration_seconds",
metric.WithDescription("Duration of blob proof operations"),
metric.WithUnit("s"),
)
if err != nil {
return nil, err
}

proofErrors, err := meter.Int64Counter(
"blob_proof_errors_total",
metric.WithDescription("Total number of blob proof errors"),
)
if err != nil {
return nil, err
}

metrics := &Metrics{
retrievalCounter: retrievalCounter,
retrievalDuration: retrievalDuration,
retrievalErrors: retrievalErrors,
retrievalNotFound: retrievalNotFound,
proofCounter: proofCounter,
proofDuration: proofDuration,
proofErrors: proofErrors,
}

// Register observable metrics
retrievalTotal, err := meter.Int64ObservableCounter(
"blob_retrieval_total_observable",
metric.WithDescription("Observable total number of blob retrievals"),
)
if err != nil {
return nil, err
}

proofTotal, err := meter.Int64ObservableCounter(
"blob_proof_total_observable",
metric.WithDescription("Observable total number of blob proofs"),
)
if err != nil {
return nil, err
}

callback := func(_ context.Context, observer metric.Observer) error {
observer.ObserveInt64(retrievalTotal, metrics.totalRetrievals.Load())
observer.ObserveInt64(proofTotal, metrics.totalProofs.Load())
return nil
}

clientReg, err := meter.RegisterCallback(callback, retrievalTotal, proofTotal)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStop: func(context.Context) error {
return clientReg.Unregister()
},
})

return metrics, nil
}

// ObserveRetrieval records blob retrieval metrics
func (m *Metrics) ObserveRetrieval(ctx context.Context, duration time.Duration, err error) {
if m == nil {
return
}

// Update counters
m.totalRetrievals.Add(1)
if err != nil {
m.totalRetrievalErrors.Add(1)
}

// Record metrics
attrs := []attribute.KeyValue{}
if err != nil {
attrs = append(attrs, attribute.String("error", err.Error()))
if errors.Is(err, ErrBlobNotFound) {
m.retrievalNotFound.Add(ctx, 1, metric.WithAttributes(attrs...))
} else {
m.retrievalErrors.Add(ctx, 1, metric.WithAttributes(attrs...))
}
} else {
m.retrievalCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
}

m.retrievalDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
}

// ObserveProof records blob proof metrics
func (m *Metrics) ObserveProof(ctx context.Context, duration time.Duration, err error) {
if m == nil {
return
}

// Update counters
m.totalProofs.Add(1)
if err != nil {
m.totalProofErrors.Add(1)
}

// Record metrics
attrs := []attribute.KeyValue{}
if err != nil {
attrs = append(attrs, attribute.String("error", err.Error()))
m.proofErrors.Add(ctx, 1, metric.WithAttributes(attrs...))
} else {
m.proofCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
}

m.proofDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
}
13 changes: 12 additions & 1 deletion blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"slices"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/types"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -61,19 +62,23 @@ type Service struct {
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
// headerSub subscribes to new headers to supply to blob subscriptions.
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)
// metrics tracks blob-related metrics
metrics *Metrics
}

func NewService(
submitter Submitter,
getter shwap.Getter,
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error),
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error),
metrics *Metrics,
) *Service {
return &Service{
blobSubmitter: submitter,
shareGetter: getter,
headerGetter: headerGetter,
headerSub: headerSub,
metrics: metrics,
}
}

Expand Down Expand Up @@ -196,9 +201,12 @@ func (s *Service) Get(
namespace libshare.Namespace,
commitment Commitment,
) (blob *Blob, err error) {
start := time.Now()
ctx, span := tracer.Start(ctx, "get")
defer func() {
utils.SetStatusAndEnd(span, err)
// Record metrics
s.metrics.ObserveRetrieval(ctx, time.Since(start), err)
}()
span.SetAttributes(
attribute.Int64("height", int64(height)),
Expand All @@ -210,7 +218,7 @@ func (s *Service) Get(
}}

blob, _, err = s.retrieve(ctx, height, namespace, sharesParser)
return
return blob, err
}

// GetProof returns an NMT inclusion proof for a specified namespace to the respective row roots
Expand All @@ -222,9 +230,12 @@ func (s *Service) GetProof(
namespace libshare.Namespace,
commitment Commitment,
) (proof *Proof, err error) {
start := time.Now()
ctx, span := tracer.Start(ctx, "get-proof")
defer func() {
utils.SetStatusAndEnd(span, err)
// Record metrics
s.metrics.ObserveProof(ctx, time.Since(start), err)
}()
span.SetAttributes(
attribute.Int64("height", int64(height)),
Expand Down
4 changes: 2 additions & 2 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) *Ser
nd, err := eds.NamespaceData(ctx, accessor, ns)
return nd, err
})
return NewService(nil, shareGetter, fn, fn2)
return NewService(nil, shareGetter, fn, fn2, nil)
}

func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *Service {
Expand Down Expand Up @@ -945,7 +945,7 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
return NewService(nil, shareGetter, fn, fn2)
return NewService(nil, shareGetter, fn, fn2, nil)
}

// TestProveCommitmentAllCombinations tests proving all the commitments in a block.
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/blob/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ConstructModule() fx.Option {
getByHeightFn func(context.Context, uint64) (*header.ExtendedHeader, error),
subscribeFn func(context.Context) (<-chan *header.ExtendedHeader, error),
) *blob.Service {
return blob.NewService(state, sGetter, getByHeightFn, subscribeFn)
return blob.NewService(state, sGetter, getByHeightFn, subscribeFn, nil)
},
fx.OnStart(func(ctx context.Context, serv *blob.Service) error {
return serv.Start(ctx)
Expand Down
9 changes: 8 additions & 1 deletion nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/celestiaorg/go-fraud"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/header"
modcore "github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
Expand Down Expand Up @@ -90,8 +91,14 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
if ca == nil {
return
}
state.WithMetrics(lc, ca)
metrics, err := state.WithMetrics(lc, ca)
if err != nil {
log.Warnf("failed to initialize state metrics: %v", err)
} else {
ca.SetMetrics(metrics)
}
}),
fx.Provide(blob.WithMetrics),
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
Expand Down
6 changes: 5 additions & 1 deletion nodebuilder/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func coreAccessor(
network p2p.Network,
client *grpc.ClientConn,
additionalConns core.AdditionalCoreConns,
metrics *state.Metrics,
) (
*state.CoreAccessor,
Module,
Expand All @@ -44,7 +45,10 @@ func coreAccessor(
}
}

ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), opts...)
ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), metrics, opts...)
if err != nil {
return nil, nil, nil, err
}

sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{
Service: ca,
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option
fx.Provide(func(ks keystore.Keystore) (keyring.Keyring, AccountName, error) {
return Keyring(*cfg, ks)
}),
// Provide nil metrics as fallback when WithMetrics() is not called
fx.Provide(func() *state.Metrics {
return nil
}),
// Metrics will be provided by settings.go when enabled
fxutil.ProvideIf(coreCfg.IsEndpointConfigured(), fx.Annotate(
coreAccessor,
fx.OnStart(func(ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
stateModule "github.com/celestiaorg/celestia-node/nodebuilder/state"
)

const (
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti
_, _, err = kr.NewMnemonic(TestKeyringName, keyring.English, "", "", hd.Secp256k1)
require.NoError(t, err)
cfg.State.DefaultKeyName = TestKeyringName
_, accName, err := state.Keyring(cfg.State, ks)
_, accName, err := stateModule.Keyring(cfg.State, ks)
require.NoError(t, err)
require.Equal(t, TestKeyringName, string(accName))

Expand Down
Loading
Loading