diff --git a/api/client/client.go b/api/client/client.go index 0d33829445..46205914cc 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -97,6 +97,7 @@ func (c *Client) initTxClient( trustedHeadGetter{remote: c.Header}, conn, submitCfg.Network.String(), + nil, // metrics not available in client context ) if err != nil { return err @@ -108,7 +109,7 @@ func (c *Client) initTxClient( c.State = core // setup blob submission service using core - blobSvc := blob.NewService(core, nil, nil, nil) + blobSvc := blob.NewService(core, nil, nil, nil) // metrics not available in client context err = blobSvc.Start(ctx) if err != nil { return err diff --git a/blob/metrics.go b/blob/metrics.go new file mode 100644 index 0000000000..e14f9aec55 --- /dev/null +++ b/blob/metrics.go @@ -0,0 +1,192 @@ +package blob + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var meter = otel.Meter("blob") + +// Error type constants for metrics labels +const ( + errorTypeNotFound = "not_found" + errorTypeTimeout = "timeout" + errorTypeCanceled = "canceled" + errorTypeUnknown = "unknown" +) + +// metrics tracks blob-related metrics +type metrics struct { + // Retrieval metrics + retrievalCounter metric.Int64Counter + retrievalDuration metric.Float64Histogram + + // Proof metrics + proofCounter metric.Int64Counter + proofDuration metric.Float64Histogram + + // Internal counters (thread-safe) + totalRetrievals atomic.Int64 + totalRetrievalErrors atomic.Int64 + totalProofs atomic.Int64 + totalProofErrors atomic.Int64 + + // Client registration for cleanup + clientReg metric.Registration +} + +// WithMetrics initializes metrics for the Service +func (s *Service) WithMetrics() error { + // Retrieval metrics + retrievalCounter, err := meter.Int64Counter( + "blob_retrieval_total", + metric.WithDescription("Total number of blob retrieval operations"), + ) + if err != nil { + return err + } + + retrievalDuration, err := meter.Float64Histogram( + "blob_retrieval_duration_seconds", + metric.WithDescription("Duration of blob retrieval operations"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + // Proof metrics + proofCounter, err := meter.Int64Counter( + "blob_proof_total", + metric.WithDescription("Total number of blob proof operations"), + ) + if err != nil { + return err + } + + proofDuration, err := meter.Float64Histogram( + "blob_proof_duration_seconds", + metric.WithDescription("Duration of blob proof operations"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + m := &metrics{ + retrievalCounter: retrievalCounter, + retrievalDuration: retrievalDuration, + proofCounter: proofCounter, + proofDuration: proofDuration, + } + + // Register observable metrics + retrievalTotal, err := meter.Int64ObservableCounter( + "blob_retrieval_total_observable", + metric.WithDescription("Observable total number of blob retrievals"), + ) + if err != nil { + return err + } + + proofTotal, err := meter.Int64ObservableCounter( + "blob_proof_total_observable", + metric.WithDescription("Observable total number of blob proofs"), + ) + if err != nil { + return err + } + + callback := func(_ context.Context, observer metric.Observer) error { + observer.ObserveInt64(retrievalTotal, m.totalRetrievals.Load()) + observer.ObserveInt64(proofTotal, m.totalProofs.Load()) + return nil + } + + clientReg, err := meter.RegisterCallback(callback, retrievalTotal, proofTotal) + if err != nil { + return err + } + + m.clientReg = clientReg + s.metrics = m + return nil +} + +// Stop cleans up metrics resources +func (m *metrics) Stop() error { + if m == nil || m.clientReg == nil { + return nil + } + return m.clientReg.Unregister() +} + +// ObserveRetrieval records blob retrieval metrics +func (m *metrics) ObserveRetrieval(ctx context.Context, duration time.Duration, err error) { + if m == nil { + return + } + + // Update counters + m.totalRetrievals.Add(1) + if err != nil { + m.totalRetrievalErrors.Add(1) + } + + // Record metrics with error type enum to avoid cardinality explosion + attrs := []attribute.KeyValue{} + if err != nil { + errorType := errorTypeUnknown + switch { + case errors.Is(err, ErrBlobNotFound): + errorType = errorTypeNotFound + case errors.Is(err, context.DeadlineExceeded): + errorType = errorTypeTimeout + case errors.Is(err, context.Canceled): + errorType = errorTypeCanceled + } + attrs = append(attrs, attribute.String("error_type", errorType)) + } + + // Use single counter with error_type enum + m.retrievalCounter.Add(ctx, 1, metric.WithAttributes(attrs...)) + + m.retrievalDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} + +// ObserveProof records blob proof metrics +func (m *metrics) ObserveProof(ctx context.Context, duration time.Duration, err error) { + if m == nil { + return + } + + // Update counters + m.totalProofs.Add(1) + if err != nil { + m.totalProofErrors.Add(1) + } + + // Record metrics with error type enum to avoid cardinality explosion + attrs := []attribute.KeyValue{} + if err != nil { + errorType := errorTypeUnknown + switch { + case errors.Is(err, context.DeadlineExceeded): + errorType = errorTypeTimeout + case errors.Is(err, context.Canceled): + errorType = errorTypeCanceled + } + attrs = append(attrs, attribute.String("error_type", errorType)) + } + + // Use single counter with error_type enum + m.proofCounter.Add(ctx, 1, metric.WithAttributes(attrs...)) + + m.proofDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} diff --git a/blob/service.go b/blob/service.go index 8bf89311fc..f95f193fb3 100644 --- a/blob/service.go +++ b/blob/service.go @@ -8,6 +8,7 @@ import ( "fmt" "slices" "sync" + "time" "github.com/cosmos/cosmos-sdk/types" logging "github.com/ipfs/go-log/v2" @@ -61,6 +62,8 @@ type Service struct { headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error) // headerSub subscribes to new headers to supply to blob subscriptions. headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error) + // metrics tracks blob-related metrics + metrics *metrics } func NewService( @@ -74,6 +77,7 @@ func NewService( shareGetter: getter, headerGetter: headerGetter, headerSub: headerSub, + metrics: nil, // Will be initialized via WithMetrics() if needed } } @@ -84,6 +88,12 @@ func (s *Service) Start(context.Context) error { func (s *Service) Stop(context.Context) error { s.cancel() + // Stop metrics if they exist + if s.metrics != nil { + if err := s.metrics.Stop(); err != nil { + return err + } + } return nil } @@ -196,9 +206,12 @@ func (s *Service) Get( namespace libshare.Namespace, commitment Commitment, ) (blob *Blob, err error) { + start := time.Now() ctx, span := tracer.Start(ctx, "get") defer func() { utils.SetStatusAndEnd(span, err) + // Record metrics + s.metrics.ObserveRetrieval(ctx, time.Since(start), err) }() span.SetAttributes( attribute.Int64("height", int64(height)), @@ -222,9 +235,12 @@ func (s *Service) GetProof( namespace libshare.Namespace, commitment Commitment, ) (proof *Proof, err error) { + start := time.Now() ctx, span := tracer.Start(ctx, "get-proof") defer func() { utils.SetStatusAndEnd(span, err) + // Record metrics + s.metrics.ObserveProof(ctx, time.Since(start), err) }() span.SetAttributes( attribute.Int64("height", int64(height)), diff --git a/core/listener_test.go b/core/listener_test.go index 7bbd29170d..3850ea968c 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -81,7 +81,10 @@ func TestListenerWithWrongChainRPC(t *testing.T) { // create one block to store as Head in local store and then unsubscribe from block events cfg := DefaultTestConfig() cfg.Genesis.ChainID = testChainID - fetcher, _ := createCoreFetcher(t, cfg) + fetcher, network := createCoreFetcher(t, cfg) + t.Cleanup(func() { + require.NoError(t, network.Stop()) + }) eds := createEdsPubSub(ctx, t) store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) diff --git a/nodebuilder/header/config.go b/nodebuilder/header/config.go index dceac3196b..56684ceeba 100644 --- a/nodebuilder/header/config.go +++ b/nodebuilder/header/config.go @@ -79,7 +79,7 @@ func (cfg *Config) trustedPeers(bpeers p2p.Bootstrappers) (infos []peer.AddrInfo } infos[i] = *p } - return infos, err + return infos, nil } // Validate performs basic validation of the config. diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index a5e0653cb7..cdfc0fedaa 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -23,6 +23,7 @@ import ( "github.com/celestiaorg/go-fraud" + "github.com/celestiaorg/celestia-node/blob" "github.com/celestiaorg/celestia-node/header" modcore "github.com/celestiaorg/celestia-node/nodebuilder/core" "github.com/celestiaorg/celestia-node/nodebuilder/das" @@ -86,11 +87,22 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti baseComponents := fx.Options( fx.Supply(metricOpts), fx.Invoke(initializeMetrics), - fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) { + fx.Invoke(func(ca *state.CoreAccessor) error { if ca == nil { - return + return nil } - state.WithMetrics(lc, ca) + err := ca.WithMetrics() + if err != nil { + return fmt.Errorf("failed to initialize state metrics: %w", err) + } + return nil + }), + fx.Invoke(func(serv *blob.Service) error { + err := serv.WithMetrics() + if err != nil { + return fmt.Errorf("failed to initialize blob metrics: %w", err) + } + return nil }), fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]), fx.Invoke(node.WithMetrics), diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 07ac0ed9ec..188ad03bed 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -49,7 +49,7 @@ func coreAccessor( opts = append(opts, state.WithTxWorkerAccounts(cfg.TxWorkerAccounts)) } - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), opts...) + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String(), nil, opts...) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ Service: ca, diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index 0e80ab3209..71e0652f8f 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -29,6 +29,7 @@ func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option fx.Provide(func(ks keystore.Keystore) (keyring.Keyring, AccountName, error) { return Keyring(*cfg, ks) }), + // Metrics will be provided by settings.go when enabled fxutil.ProvideIf(coreCfg.IsEndpointConfigured(), fx.Annotate( coreAccessor, fx.OnStart(func(ctx context.Context, diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index 3e101b810f..b9d4c4f2d4 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -19,7 +19,7 @@ import ( "github.com/celestiaorg/celestia-node/libs/fxutil" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/nodebuilder/state" + stateModule "github.com/celestiaorg/celestia-node/nodebuilder/state" ) const ( @@ -61,7 +61,7 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti _, _, err = kr.NewMnemonic(TestKeyringName, keyring.English, "", "", hd.Secp256k1) require.NoError(t, err) cfg.State.DefaultKeyName = TestKeyringName - _, accName, err := state.Keyring(cfg.State, ks) + _, accName, err := stateModule.Keyring(cfg.State, ks) require.NoError(t, err) require.Equal(t, TestKeyringName, string(accName)) diff --git a/share/root.go b/share/root.go index 5355c25ad0..af6e42a229 100644 --- a/share/root.go +++ b/share/root.go @@ -68,7 +68,7 @@ func RowsWithNamespace(root *AxisRoots, namespace libshare.Namespace) (idxs []in idxs = append(idxs, i) } } - return idxs, err + return idxs, nil } // RootHashForCoordinates returns the root hash for the given coordinates. diff --git a/state/core_access.go b/state/core_access.go index 7a548d450c..f0b2a4f3be 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -80,6 +80,9 @@ type CoreAccessor struct { estimatorServiceTLS bool estimatorConn *grpc.ClientConn + // metrics tracks state-related metrics + metrics *metrics + // these fields are mutatable and thus need to be protected by a mutex lock sync.Mutex lastPayForBlob int64 @@ -95,6 +98,7 @@ func NewCoreAccessor( getter libhead.Head[*header.ExtendedHeader], conn *grpc.ClientConn, network string, + metrics *metrics, opts ...Option, ) (*CoreAccessor, error) { // create verifier @@ -119,6 +123,7 @@ func NewCoreAccessor( prt: prt, coreConns: []*grpc.ClientConn{conn}, network: network, + metrics: metrics, } for _, opt := range opts { @@ -164,6 +169,11 @@ func (ca *CoreAccessor) Stop(_ context.Context) error { ca.estimatorConn = nil } + // Stop metrics if they exist + if err := ca.metrics.Stop(); err != nil { + return err + } + return nil } @@ -174,11 +184,25 @@ func (ca *CoreAccessor) SubmitPayForBlob( ctx context.Context, libBlobs []*libshare.Blob, cfg *TxConfig, -) (*TxResponse, error) { +) (_ *TxResponse, err error) { + start := time.Now() if len(libBlobs) == 0 { return nil, errors.New("state: no blobs provided") } + // Calculate blob metrics - optimized single pass + totalSize := int64(0) + for _, blob := range libBlobs { + totalSize += int64(len(blob.Data())) + } + + var gasEstimationDuration time.Duration + + // Use defer to ensure metrics are recorded exactly once at the end + defer func() { + ca.metrics.ObservePfbSubmission(ctx, time.Since(start), len(libBlobs), totalSize, gasEstimationDuration, 0, err) + }() + client, err := ca.getTxClient(ctx) if err != nil { return nil, err @@ -193,25 +217,45 @@ func (ca *CoreAccessor) SubmitPayForBlob( feeGrant = user.SetFeeGranter(granter) } - // get tx signer account name - author, err := ca.getTxAuthorAccAddress(cfg) - if err != nil { - return nil, err - } - account := ca.client.AccountByAddress(ctx, author) - if account == nil { - return nil, fmt.Errorf("account for signer %s not found", author) - } - + // Gas estimation with metrics - only record when actual estimation occurs gas := cfg.GasLimit() + var author AccAddress + if gas == 0 { + gasEstimationStart := time.Now() + // get tx signer account name first for gas estimation + author, err = ca.getTxAuthorAccAddress(cfg) + if err != nil { + return nil, err + } gas, err = ca.estimateGasForBlobs(author.String(), libBlobs) + gasEstimationDuration = time.Since(gasEstimationStart) + ca.metrics.ObserveGasEstimation(ctx, gasEstimationDuration, err) + if err != nil { + return nil, err + } + } else { + // get tx signer account name + author, err = ca.getTxAuthorAccAddress(cfg) if err != nil { return nil, err } } + // Account query with metrics + accountQueryStart := time.Now() + account := ca.client.AccountByAddress(ctx, author) + ca.metrics.ObserveAccountQuery(ctx, time.Since(accountQueryStart), nil) + + if account == nil { + err = fmt.Errorf("account for signer %s not found", author) + return nil, err + } + + // Gas price estimation with metrics + gasPriceEstimationStart := time.Now() gasPrice, err := ca.estimateGasPrice(ctx, cfg) + ca.metrics.ObserveGasPriceEstimation(ctx, time.Since(gasPriceEstimationStart), err) if err != nil { return nil, err } diff --git a/state/core_access_test.go b/state/core_access_test.go index 8e9bf2f1e9..2226864b89 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -257,7 +257,7 @@ func TestParallelPayForBlobSubmission(t *testing.T) { conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, WithTxWorkerAccounts(workerAccounts)) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, nil, WithTxWorkerAccounts(workerAccounts)) require.NoError(t, err) err = ca.Start(ctx) require.NoError(t, err) @@ -337,7 +337,7 @@ func TestTxWorkerSetup(t *testing.T) { keyring.DefaultBIP39Passphrase, hd.Secp256k1) require.NoError(t, err) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, WithTxWorkerAccounts(8)) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, nil, WithTxWorkerAccounts(8)) require.NoError(t, err) err = ca.Start(ctx) require.NoError(t, err) @@ -362,7 +362,7 @@ func buildAccessor(t *testing.T, opts ...Option) (*CoreAccessor, []string) { conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, opts...) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0], nil, conn, chainID, nil, opts...) require.NoError(t, err) return ca, accounts } diff --git a/state/integration_test.go b/state/integration_test.go index cf1da0d83c..ce3b7035b4 100644 --- a/state/integration_test.go +++ b/state/integration_test.go @@ -54,7 +54,7 @@ func (s *IntegrationTestSuite) SetupSuite() { s.Require().Greater(len(s.accounts), 0) accountName := s.accounts[0].Name - accessor, err := NewCoreAccessor(s.network.Keyring, accountName, localHeader{s.network.Client}, nil, "") + accessor, err := NewCoreAccessor(s.network.Keyring, accountName, localHeader{s.network.Client}, nil, "", nil) require.NoError(s.T(), err) ctx, cancel := context.WithCancel(context.Background()) accessor.ctx = ctx diff --git a/state/metrics.go b/state/metrics.go index a9f0074176..b0a44805a8 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -2,41 +2,374 @@ package state import ( "context" + "sync/atomic" + "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.uber.org/fx" ) var meter = otel.Meter("state") -func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) { - pfbCounter, _ := meter.Int64ObservableCounter( - "pfb_count", - metric.WithDescription("Total count of submitted PayForBlob transactions"), +// Attribute name constants for metrics labels +const ( + attrBlobCount = "blob_count" + attrTotalSize = "total_size_bytes" + attrGasPrice = "gas_price_utia" + attrSuccess = "success" +) + +// metrics tracks state-related metrics +type metrics struct { + // PFB submission metrics + pfbSubmissionCounter metric.Int64Counter + pfbSubmissionDuration metric.Float64Histogram + pfbSubmissionErrors metric.Int64Counter + pfbSubmissionBlobCount metric.Int64Counter + pfbSubmissionBlobSize metric.Int64Counter + pfbGasEstimationDuration metric.Float64Histogram + pfbGasPriceEstimation metric.Float64Histogram + + // Gas estimation metrics + gasEstimationDuration metric.Float64Histogram + gasEstimationErrors metric.Int64Counter + + // Gas price estimation metrics + gasPriceEstimationDuration metric.Float64Histogram + gasPriceEstimationErrors metric.Int64Counter + + // Account operations metrics + accountQueryDuration metric.Float64Histogram + accountQueryErrors metric.Int64Counter + + // Internal counters (thread-safe) + totalPfbSubmissions atomic.Int64 + totalPfbSubmissionErrors atomic.Int64 + totalGasEstimations atomic.Int64 + totalGasEstimationErrors atomic.Int64 + totalGasPriceEstimations atomic.Int64 + totalGasPriceEstimationErrors atomic.Int64 + totalAccountQueries atomic.Int64 + totalAccountQueryErrors atomic.Int64 + + // Client registration for cleanup + clientReg metric.Registration +} + +// WithMetrics initializes metrics for the CoreAccessor +func (ca *CoreAccessor) WithMetrics() error { + // PFB submission metrics + pfbSubmissionCounter, err := meter.Int64Counter( + "state_pfb_submission_total", + metric.WithDescription("Total number of PayForBlob submissions"), ) - lastPfbTimestamp, _ := meter.Int64ObservableCounter( - "last_pfb_timestamp", - metric.WithDescription("Timestamp of the last submitted PayForBlob transaction"), + if err != nil { + return err + } + + pfbSubmissionDuration, err := meter.Float64Histogram( + "state_pfb_submission_duration_seconds", + metric.WithDescription("Duration of PayForBlob submission operations"), + metric.WithUnit("s"), ) + if err != nil { + return err + } + + pfbSubmissionErrors, err := meter.Int64Counter( + "state_pfb_submission_errors_total", + metric.WithDescription("Total number of PayForBlob submission errors"), + ) + if err != nil { + return err + } + + pfbSubmissionBlobCount, err := meter.Int64Counter( + "state_pfb_submission_blob_count_total", + metric.WithDescription("Total number of blobs in PayForBlob submissions"), + ) + if err != nil { + return err + } + + pfbSubmissionBlobSize, err := meter.Int64Counter( + "state_pfb_submission_blob_size_bytes_total", + metric.WithDescription("Total size of blobs in PayForBlob submissions in bytes"), + ) + if err != nil { + return err + } + + pfbGasEstimationDuration, err := meter.Float64Histogram( + "state_pfb_gas_estimation_duration_seconds", + metric.WithDescription("Duration of gas estimation for PayForBlob"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + pfbGasPriceEstimation, err := meter.Float64Histogram( + "state_pfb_gas_price_estimation", + metric.WithDescription("Estimated gas price for PayForBlob"), + metric.WithUnit("utia"), + ) + if err != nil { + return err + } + + // Gas estimation metrics + gasEstimationDuration, err := meter.Float64Histogram( + "state_gas_estimation_duration_seconds", + metric.WithDescription("Duration of gas estimation operations"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + gasEstimationErrors, err := meter.Int64Counter( + "state_gas_estimation_errors_total", + metric.WithDescription("Total number of gas estimation errors"), + ) + if err != nil { + return err + } + + // Gas price estimation metrics + gasPriceEstimationDuration, err := meter.Float64Histogram( + "state_gas_price_estimation_duration_seconds", + metric.WithDescription("Duration of gas price estimation operations"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + gasPriceEstimationErrors, err := meter.Int64Counter( + "state_gas_price_estimation_errors_total", + metric.WithDescription("Total number of gas price estimation errors"), + ) + if err != nil { + return err + } + + // Account operations metrics + accountQueryDuration, err := meter.Float64Histogram( + "state_account_query_duration_seconds", + metric.WithDescription("Duration of account query operations"), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + + accountQueryErrors, err := meter.Int64Counter( + "state_account_query_errors_total", + metric.WithDescription("Total number of account query errors"), + ) + if err != nil { + return err + } + + m := &metrics{ + pfbSubmissionCounter: pfbSubmissionCounter, + pfbSubmissionDuration: pfbSubmissionDuration, + pfbSubmissionErrors: pfbSubmissionErrors, + pfbSubmissionBlobCount: pfbSubmissionBlobCount, + pfbSubmissionBlobSize: pfbSubmissionBlobSize, + pfbGasEstimationDuration: pfbGasEstimationDuration, + pfbGasPriceEstimation: pfbGasPriceEstimation, + gasEstimationDuration: gasEstimationDuration, + gasEstimationErrors: gasEstimationErrors, + gasPriceEstimationDuration: gasPriceEstimationDuration, + gasPriceEstimationErrors: gasPriceEstimationErrors, + accountQueryDuration: accountQueryDuration, + accountQueryErrors: accountQueryErrors, + } + + // Register observable metrics for OTLP export + pfbSubmissionObservable, err := meter.Int64ObservableCounter( + "state_pfb_submission_total_observable", + metric.WithDescription("Observable total number of PayForBlob submissions"), + ) + if err != nil { + return err + } + + gasEstimationObservable, err := meter.Int64ObservableCounter( + "state_gas_estimation_total_observable", + metric.WithDescription("Observable total number of gas estimation operations"), + ) + if err != nil { + return err + } + + gasPriceEstimationObservable, err := meter.Int64ObservableCounter( + "state_gas_price_estimation_total_observable", + metric.WithDescription("Observable total number of gas price estimation operations"), + ) + if err != nil { + return err + } + + accountQueryObservable, err := meter.Int64ObservableCounter( + "state_account_query_total_observable", + metric.WithDescription("Observable total number of account query operations"), + ) + if err != nil { + return err + } callback := func(_ context.Context, observer metric.Observer) error { - observer.ObserveInt64(pfbCounter, ca.PayForBlobCount()) - observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob()) + // Observable metrics for OTLP export + observer.ObserveInt64(pfbSubmissionObservable, m.totalPfbSubmissions.Load()) + observer.ObserveInt64(gasEstimationObservable, m.totalGasEstimations.Load()) + observer.ObserveInt64(gasPriceEstimationObservable, m.totalGasPriceEstimations.Load()) + observer.ObserveInt64(accountQueryObservable, m.totalAccountQueries.Load()) + return nil + } + + clientReg, err := meter.RegisterCallback( + callback, + pfbSubmissionObservable, + gasEstimationObservable, + gasPriceEstimationObservable, + accountQueryObservable, + ) + if err != nil { + log.Errorf("Failed to register metrics callback: %v", err) + return err + } + + // Update the CoreAccessor with the new metrics + ca.metrics = m + + // Store the client registration for cleanup + m.clientReg = clientReg + + return nil +} + +// Stop cleans up the metrics resources +func (m *metrics) Stop() error { + if m == nil || m.clientReg == nil { return nil } + if err := m.clientReg.Unregister(); err != nil { + log.Warnw("failed to close metrics", "err", err) + return err + } + return nil +} + +// ObservePfbSubmission records PayForBlob submission metrics +func (m *metrics) ObservePfbSubmission( + ctx context.Context, + duration time.Duration, + blobCount int, + totalSize int64, + gasEstimationDuration time.Duration, + gasPrice float64, + err error, +) { + if m == nil { + return + } + + // Update counters + m.totalPfbSubmissions.Add(1) + if err != nil { + m.totalPfbSubmissionErrors.Add(1) + } + + // Record metrics + attrs := []attribute.KeyValue{ + attribute.Int(attrBlobCount, blobCount), + attribute.Int64(attrTotalSize, totalSize), + attribute.Float64(attrGasPrice, gasPrice), + attribute.Bool(attrSuccess, err == nil), + } + + if err != nil { + m.pfbSubmissionErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) + } else { + m.pfbSubmissionCounter.Add(ctx, 1, metric.WithAttributes(attrs...)) + } + + m.pfbSubmissionDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) + m.pfbSubmissionBlobCount.Add(ctx, int64(blobCount), metric.WithAttributes(attrs...)) + m.pfbSubmissionBlobSize.Add(ctx, totalSize, metric.WithAttributes(attrs...)) + m.pfbGasEstimationDuration.Record(ctx, gasEstimationDuration.Seconds(), metric.WithAttributes(attrs...)) + m.pfbGasPriceEstimation.Record(ctx, gasPrice, metric.WithAttributes(attrs...)) +} + +// ObserveGasEstimation records gas estimation metrics +func (m *metrics) ObserveGasEstimation(ctx context.Context, duration time.Duration, err error) { + if m == nil { + return + } + + // Update counters + m.totalGasEstimations.Add(1) + if err != nil { + m.totalGasEstimationErrors.Add(1) + } + + // Record metrics + attrs := []attribute.KeyValue{ + attribute.Bool(attrSuccess, err == nil), + } + if err != nil { + m.gasEstimationErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) + } + + m.gasEstimationDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} + +// ObserveGasPriceEstimation records gas price estimation metrics +func (m *metrics) ObserveGasPriceEstimation(ctx context.Context, duration time.Duration, err error) { + if m == nil { + return + } + + // Update counters + m.totalGasPriceEstimations.Add(1) + if err != nil { + m.totalGasPriceEstimationErrors.Add(1) + } + + // Record metrics + attrs := []attribute.KeyValue{ + attribute.Bool(attrSuccess, err == nil), + } + if err != nil { + m.gasPriceEstimationErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) + } - clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) + m.gasPriceEstimationDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} + +// ObserveAccountQuery records account query metrics +func (m *metrics) ObserveAccountQuery(ctx context.Context, duration time.Duration, err error) { + if m == nil { + return + } + + // Update counters + m.totalAccountQueries.Add(1) + if err != nil { + m.totalAccountQueryErrors.Add(1) + } + + // Record metrics + attrs := []attribute.KeyValue{ + attribute.Bool(attrSuccess, err == nil), + } if err != nil { - panic(err) + m.accountQueryErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) } - lc.Append(fx.Hook{ - OnStop: func(context.Context) error { - if err := clientReg.Unregister(); err != nil { - log.Warnw("failed to close metrics", "err", err) - } - return nil - }, - }) + m.accountQueryDuration.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) }