Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
3090cbf
add blob submission metrics
gupadhyaya Sep 12, 2025
b25c77f
fix tests
gupadhyaya Sep 12, 2025
c451259
fix build errors
gupadhyaya Sep 12, 2025
931387a
fix api
gupadhyaya Sep 12, 2025
43e6746
shorten
gupadhyaya Sep 12, 2025
b4f3dde
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 12, 2025
0487333
remove duplicates
gupadhyaya Sep 16, 2025
6be3ead
fix format issue
gupadhyaya Sep 16, 2025
17a30e2
Fix bitswap compilation issues and enable metrics in tastora framework
gupadhyaya Sep 17, 2025
6bc2316
Remove HashOnRead method from BlockstoreWithMetrics
gupadhyaya Sep 17, 2025
962f115
Update tastora framework to use local Docker image with metrics
gupadhyaya Sep 17, 2025
7198aa5
Remove interface compliance check for BlockstoreWithMetrics
gupadhyaya Sep 17, 2025
80e2a6a
fix coreaccessor with new metrics instead of nil, fx.invoke issue
gupadhyaya Sep 17, 2025
af3953a
fix blob metrics
gupadhyaya Sep 17, 2025
2af16ef
go.mod changes
gupadhyaya Sep 17, 2025
06e3513
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 17, 2025
4659fbe
go mod tidy
gupadhyaya Sep 17, 2025
871bd90
blob metrics fixes
gupadhyaya Sep 18, 2025
ccbb3a3
fix
gupadhyaya Sep 18, 2025
e00d3c7
remove unnecessary method
gupadhyaya Sep 18, 2025
e98716f
remove debug stuff
gupadhyaya Sep 18, 2025
ab8d1b2
WithMetrics
gupadhyaya Sep 18, 2025
0f2a3cf
remove passing nil metrics
gupadhyaya Sep 18, 2025
5c1a2f6
private metrics, stop method
gupadhyaya Sep 18, 2025
88ba0e0
cleanup
gupadhyaya Sep 18, 2025
88e78ba
Merge branch 'main' into blob_submission_metrics
gupadhyaya Sep 18, 2025
510b48c
blob metrics changes
gupadhyaya Sep 18, 2025
0044277
remove nil checks
gupadhyaya Sep 19, 2025
ab99226
get rid of duplicate legacy state metrics
gupadhyaya Sep 19, 2025
29c9a4e
optimizing metrics recording
gupadhyaya Sep 19, 2025
1ba9c6c
error out if metrics fail
gupadhyaya Sep 19, 2025
c604d7b
named error
gupadhyaya Sep 19, 2025
74a42d1
skip resp name
gupadhyaya Sep 19, 2025
878d1bf
error as bool/enum to avoid cardinality explosion
gupadhyaya Sep 19, 2025
9356b77
golangci-lint fix
gupadhyaya Sep 19, 2025
06a3961
unify error counting
gupadhyaya Sep 19, 2025
b6f7a3d
remove magic strings
gupadhyaya Sep 22, 2025
96b085a
golangci-lint fixes
gupadhyaya Sep 23, 2025
f110c2d
further fix golangci-lint
gupadhyaya Sep 23, 2025
533f70e
fixing docker security issue
gupadhyaya Sep 23, 2025
344da58
Revert "fixing docker security issue"
gupadhyaya Sep 23, 2025
f7d2d64
Merge origin/main into blob_submission_metrics
gupadhyaya Oct 14, 2025
0de2e67
Fix compilation error after merge
gupadhyaya Oct 14, 2025
380ea9e
Apply goimports-reviser formatting to fix import organization
gupadhyaya Oct 14, 2025
5ac90dc
Fix import ordering in state/core_access.go
gupadhyaya Oct 14, 2025
b81bfc0
Fix remaining issues after merge
gupadhyaya Oct 14, 2025
4495e04
minor
gupadhyaya Oct 14, 2025
5006f04
go mod tidy
gupadhyaya Oct 14, 2025
0d3d4e4
remove unnecessary metrics
gupadhyaya Oct 21, 2025
70ad967
Merge branch 'main' into blob_submission_metrics
gupadhyaya Oct 21, 2025
c373bd9
Remove accidentally committed grafana and metrics files
gupadhyaya Oct 21, 2025
5ac205d
Fix TestListenerWithWrongChainRPC cleanup issue
gupadhyaya Oct 21, 2025
03d26e7
Merge remote-tracking branch 'origin/main' into blob_submission_metrics
gupadhyaya Oct 28, 2025
ace5b72
fix
gupadhyaya Oct 28, 2025
24a6adc
minor fix
gupadhyaya Oct 28, 2025
a0079b2
fix failing test
gupadhyaya Oct 28, 2025
592d504
cleanup
gupadhyaya Oct 28, 2025
086d7c7
remove redundant metrics, private methods, cleanup
gupadhyaya Oct 28, 2025
ac2a6f6
Merge branch 'main' into blob_submission_metrics
gupadhyaya Oct 28, 2025
3b73c24
trash clean
gupadhyaya Oct 28, 2025
d111c17
use normal counter, fix cardinality issue
gupadhyaya Oct 28, 2025
e0954dd
Merge branch 'main' into blob_submission_metrics
gupadhyaya Oct 28, 2025
54cb5cf
add gas tracking for all txs
gupadhyaya Oct 30, 2025
11cc8cb
Merge branch 'main' into blob_submission_metrics
gupadhyaya Oct 30, 2025
7a2515f
fix lint
gupadhyaya Oct 30, 2025
153e9a2
fix race
gupadhyaya Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (c *Client) initTxClient(
trustedHeadGetter{remote: c.Header},
conn,
submitCfg.Network.String(),
nil,
)
if err != nil {
return err
Expand Down
128 changes: 128 additions & 0 deletions blob/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package blob

import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("blob")

// Error type constants for metrics labels
const (
errorTypeNotFound = "not_found"
errorTypeTimeout = "timeout"
errorTypeCanceled = "canceled"
errorTypeUnknown = "unknown"
)

// metrics tracks blob-related metrics
type metrics struct {
// Retrieval metrics
retrievalDuration metric.Float64Histogram
retrievalTotal metric.Int64Counter

// Proof metrics
proofDuration metric.Float64Histogram
proofTotal metric.Int64Counter
}

// WithMetrics initializes metrics for the Service
func (s *Service) WithMetrics() error {
// Retrieval metrics
retrievalDuration, err := meter.Float64Histogram(
"blob_retrieval_duration_seconds",
metric.WithDescription("Duration of blob retrieval operations"),
metric.WithUnit("s"),
)
if err != nil {
return err
}

// Proof metrics
proofDuration, err := meter.Float64Histogram(
"blob_proof_duration_seconds",
metric.WithDescription("Duration of blob proof operations"),
metric.WithUnit("s"),
)
if err != nil {
return err
}

retrievalTotal, err := meter.Int64Counter(
"blob_retrieval_total",
metric.WithDescription("Total number of blob retrievals"),
)
if err != nil {
return err
}

proofTotal, err := meter.Int64Counter(
"blob_proof_total",
metric.WithDescription("Total number of blob proofs"),
)
if err != nil {
return err
}

m := &metrics{
retrievalDuration: retrievalDuration,
retrievalTotal: retrievalTotal,
proofDuration: proofDuration,
proofTotal: proofTotal,
}

s.metrics = m
return nil
}

// observeRetrieval records blob retrieval metrics
func (m *metrics) observeRetrieval(ctx context.Context, duration time.Duration, err error) {
if m == nil {
return
}

// Record metrics with error type enum to avoid cardinality explosion
attrs := []attribute.KeyValue{}
if err != nil {
errorType := errorTypeUnknown
switch {
case errors.Is(err, ErrBlobNotFound):
errorType = errorTypeNotFound
case errors.Is(err, context.DeadlineExceeded):
errorType = errorTypeTimeout
case errors.Is(err, context.Canceled):
errorType = errorTypeCanceled
}
attrs = append(attrs, attribute.String("error_type", errorType))
}

m.retrievalDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
m.retrievalTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
}

// observeProof records blob proof metrics
func (m *metrics) observeProof(ctx context.Context, duration time.Duration, err error) {
if m == nil {
return
}

attrs := []attribute.KeyValue{}
if err != nil {
errorType := errorTypeUnknown
switch {
case errors.Is(err, context.DeadlineExceeded):
errorType = errorTypeTimeout
case errors.Is(err, context.Canceled):
errorType = errorTypeCanceled
}
attrs = append(attrs, attribute.String("error_type", errorType))
}

m.proofDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
m.proofTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
}
9 changes: 9 additions & 0 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"slices"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/types"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -61,6 +62,8 @@ type Service struct {
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
// headerSub subscribes to new headers to supply to blob subscriptions.
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)
// metrics tracks blob-related metrics
metrics *metrics
}

func NewService(
Expand All @@ -74,6 +77,7 @@ func NewService(
shareGetter: getter,
headerGetter: headerGetter,
headerSub: headerSub,
metrics: nil, // Will be initialized via WithMetrics() if needed
}
}

Expand All @@ -84,6 +88,7 @@ func (s *Service) Start(context.Context) error {

func (s *Service) Stop(context.Context) error {
s.cancel()
// No cleanup needed for metrics
return nil
}

Expand Down Expand Up @@ -196,9 +201,11 @@ func (s *Service) Get(
namespace libshare.Namespace,
commitment Commitment,
) (blob *Blob, err error) {
start := time.Now()
ctx, span := tracer.Start(ctx, "get")
defer func() {
utils.SetStatusAndEnd(span, err)
s.metrics.observeRetrieval(ctx, time.Since(start), err)
}()
span.SetAttributes(
attribute.Int64("height", int64(height)),
Expand All @@ -222,9 +229,11 @@ func (s *Service) GetProof(
namespace libshare.Namespace,
commitment Commitment,
) (proof *Proof, err error) {
start := time.Now()
ctx, span := tracer.Start(ctx, "get-proof")
defer func() {
utils.SetStatusAndEnd(span, err)
s.metrics.observeProof(ctx, time.Since(start), err)
}()
span.SetAttributes(
attribute.Int64("height", int64(height)),
Expand Down
5 changes: 4 additions & 1 deletion core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func TestListenerWithWrongChainRPC(t *testing.T) {
// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.Genesis.ChainID = testChainID
fetcher, _ := createCoreFetcher(t, cfg)
fetcher, network := createCoreFetcher(t, cfg)
t.Cleanup(func() {
require.NoError(t, network.Stop())
})
eds := createEdsPubSub(ctx, t)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
Expand Down
1 change: 1 addition & 0 deletions nodebuilder/da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Module interface {

// Validate validates Commitments against the corresponding Proofs. This should be possible without retrieving the
// Blobs.
//
// Deprecated: This method is deprecated and will be removed in the future.
Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace da.Namespace) ([]bool, error)
}
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/header/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (cfg *Config) trustedPeers(bpeers p2p.Bootstrappers) (infos []peer.AddrInfo
}
infos[i] = *p
}
return infos, err
return infos, nil
}

// Validate performs basic validation of the config.
Expand Down
18 changes: 15 additions & 3 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/celestiaorg/go-fraud"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/header"
modcore "github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
Expand Down Expand Up @@ -86,11 +87,22 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) {
fx.Invoke(func(ca *state.CoreAccessor) error {
if ca == nil {
return
return nil
}
state.WithMetrics(lc, ca)
err := ca.WithMetrics()
if err != nil {
return fmt.Errorf("failed to initialize state metrics: %w", err)
}
return nil
}),
fx.Invoke(func(serv *blob.Service) error {
err := serv.WithMetrics()
if err != nil {
return fmt.Errorf("failed to initialize blob metrics: %w", err)
}
return nil
}),
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func coreAccessor(
opts = append(opts, state.WithTxWorkerAccounts(cfg.TxWorkerAccounts))
}

ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), opts...)
ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), nil, opts...)

sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{
Service: ca,
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
stateModule "github.com/celestiaorg/celestia-node/nodebuilder/state"
)

const (
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti
_, _, err = kr.NewMnemonic(TestKeyringName, keyring.English, "", "", hd.Secp256k1)
require.NoError(t, err)
cfg.State.DefaultKeyName = TestKeyringName
_, accName, err := state.Keyring(cfg.State, ks)
_, accName, err := stateModule.Keyring(cfg.State, ks)
require.NoError(t, err)
require.Equal(t, TestKeyringName, string(accName))

Expand Down
8 changes: 6 additions & 2 deletions pruner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ func (s *Service) WithMetrics() error {
}

callback := func(_ context.Context, observer metric.Observer) error {
observer.ObserveInt64(lastPruned, int64(s.checkpoint.LastPrunedHeight))
observer.ObserveInt64(failedPrunes, int64(len(s.checkpoint.FailedHeaders)))
s.checkpointMu.Lock()
if s.checkpoint != nil {
observer.ObserveInt64(lastPruned, int64(s.checkpoint.LastPrunedHeight))
observer.ObserveInt64(failedPrunes, int64(len(s.checkpoint.FailedHeaders)))
}
s.checkpointMu.Unlock()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion share/eds/retriever_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func TestRetriever_ByzantineError(t *testing.T) {
const width = 8
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

bserv := ipld.NewMemBlockservice()
Expand Down
2 changes: 1 addition & 1 deletion share/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func RowsWithNamespace(root *AxisRoots, namespace libshare.Namespace) (idxs []in
idxs = append(idxs, i)
}
}
return idxs, err
return idxs, nil
}

// RootHashForCoordinates returns the root hash for the given coordinates.
Expand Down
Loading
Loading