Skip to content

Commit

Permalink
refactor: default to prometheus.DefaultRegisterer (#722)
Browse files Browse the repository at this point in the history
- removes calls to prometheus.NewRegistry()
- replaces NewRegistry() call with global `prometheus.DefaultRegisterer`
  so by default boxo users who did not specify custom registry
  are not missing any metrics.
- ensures we don't panic if tests run in parallel and DefaultRegisterer is used
  • Loading branch information
lidel authored Nov 27, 2024
1 parent 821f539 commit cdefbf2
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The following emojis are used to highlight certain changes:

- The above is only necessary if content routing is needed. Otherwise:

```
```go
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter set to nil (disable content discovery)
Expand All @@ -64,6 +64,8 @@ The following emojis are used to highlight certain changes:

- `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724)
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`. [#720](https://github.com/ipfs/boxo/pull/720)
- `routing/http/server`: exposes Prometheus metrics on `prometheus.DefaultRegisterer` and a custom one can be provided via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)

### Changed

Expand Down
19 changes: 12 additions & 7 deletions gateway/backend_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewCarBackend(f CarFetcher, opts ...BackendOption) (*CarBackend, error) {
return nil, err
}

var promReg prometheus.Registerer = prometheus.NewRegistry()
var promReg prometheus.Registerer = prometheus.DefaultRegisterer
if compiledOptions.promRegistry != nil {
promReg = compiledOptions.promRegistry
}
Expand Down Expand Up @@ -117,6 +117,11 @@ func NewRemoteCarBackend(gatewayURL []string, httpClient *http.Client, opts ...B
}

func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics {
// make sure we have functional registry
if promReg == nil {
promReg = prometheus.DefaultRegisterer
}

// How many CAR Fetch attempts we had? Need this to calculate % of various car request types.
// We only count attempts here, because success/failure with/without retries are provided by caboose:
// - ipfs_caboose_fetch_duration_car_success_count
Expand All @@ -129,15 +134,15 @@ func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics
Name: "car_fetch_attempts",
Help: "The number of times a CAR fetch was attempted by IPFSBackend.",
})
promReg.MustRegister(carFetchAttemptMetric)
registerMetric(promReg, carFetchAttemptMetric)

contextAlreadyCancelledMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_car_backend",
Name: "car_fetch_context_already_cancelled",
Help: "The number of times context is already cancelled when a CAR fetch was attempted by IPFSBackend.",
})
promReg.MustRegister(contextAlreadyCancelledMetric)
registerMetric(promReg, contextAlreadyCancelledMetric)

// How many blocks were read via CARs?
// Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts.
Expand All @@ -147,15 +152,15 @@ func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics
Name: "car_blocks_fetched",
Help: "The number of blocks successfully read via CAR fetch.",
})
promReg.MustRegister(carBlocksFetchedMetric)
registerMetric(promReg, carBlocksFetchedMetric)

carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_car_backend",
Name: "car_fetch_params",
Help: "How many times specific CAR parameter was used during CAR data fetch.",
}, []string{"dagScope", "entityRanges"}) // we use 'ranges' instead of 'bytes' here because we only count the number of ranges present
promReg.MustRegister(carParamsMetric)
registerMetric(promReg, carParamsMetric)

bytesRangeStartMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -164,7 +169,7 @@ func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics
Help: "Tracks where did the range request start.",
Buckets: prometheus.ExponentialBuckets(1024, 2, 24), // 1024 bytes to 8 GiB
})
promReg.MustRegister(bytesRangeStartMetric)
registerMetric(promReg, bytesRangeStartMetric)

bytesRangeSizeMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -173,7 +178,7 @@ func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics
Help: "Tracks the size of range requests.",
Buckets: prometheus.ExponentialBuckets(256*1024, 2, 10), // From 256KiB to 100MiB
})
promReg.MustRegister(bytesRangeSizeMetric)
registerMetric(promReg, bytesRangeSizeMetric)

return &CarBackendMetrics{
contextAlreadyCancelledMetric,
Expand Down
16 changes: 4 additions & 12 deletions gateway/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@ var _ blockstore.Blockstore = (*cacheBlockStore)(nil)
// in memory using a two queue cache. It can be useful, for example, when paired
// with a proxy blockstore (see [NewRemoteBlockstore]).
//
// If the given [prometheus.Registerer] is nil, a new one will be created using
// [prometheus.NewRegistry].
// If the given [prometheus.Registerer] is nil, a [prometheus.DefaultRegisterer] will be used.
func NewCacheBlockStore(size int, reg prometheus.Registerer) (blockstore.Blockstore, error) {
c, err := lru.New2Q[string, []byte](size)
if err != nil {
return nil, err
}

if reg == nil {
reg = prometheus.NewRegistry()
reg = prometheus.DefaultRegisterer
}

cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -61,15 +60,8 @@ func NewCacheBlockStore(size int, reg prometheus.Registerer) (blockstore.Blockst
Help: "The number of global block cache requests.",
})

err = reg.Register(cacheHitsMetric)
if err != nil {
return nil, err
}

err = reg.Register(cacheRequestsMetric)
if err != nil {
return nil, err
}
registerMetric(reg, cacheHitsMetric)
registerMetric(reg, cacheRequestsMetric)

return &cacheBlockStore{
cache: c,
Expand Down
11 changes: 11 additions & 0 deletions gateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,14 @@ var tracer = otel.Tracer("boxo/gateway")
func spanTrace(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return tracer.Start(ctx, "Gateway."+spanName, opts...)
}

// registerMetric registers metrics in registry or logs an error.
//
// Registration may error if metric is alreadyregistered. we are not using
// MustRegister here to allow people to run tests in parallel without having to
// write tedious glue code that creates unique registry for each unit test
func registerMetric(registry prometheus.Registerer, metric prometheus.Collector) {
if err := registry.Register(metric); err != nil {
log.Errorf("failed to register %v: %v", metric, err)
}
}
18 changes: 16 additions & 2 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"mime"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -152,14 +153,25 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
}

if server.promRegistry == nil {
server.promRegistry = prometheus.NewRegistry()
server.promRegistry = prometheus.DefaultRegisterer
}

// Workaround due to https://github.com/slok/go-http-metrics
// using egistry.MustRegister internally.
// In production there will be only one handler, however we append counter
// to ensure duplicate metric registration will not panic in parallel tests
// when global prometheus.DefaultRegisterer is used.
metricsPrefix := "delegated_routing_server"
c := handlerCount.Add(1)
if c > 1 {
metricsPrefix = fmt.Sprintf("%s_%d", metricsPrefix, c)
}

// Create middleware with prometheus recorder
mdlw := middleware.New(middleware.Config{
Recorder: metrics.NewRecorder(metrics.Config{
Registry: server.promRegistry,
Prefix: "delegated_routing_server",
Prefix: metricsPrefix,
SizeBuckets: prometheus.ExponentialBuckets(100, 4, 8), // [100 400 1600 6400 25600 102400 409600 1.6384e+06]
DurationBuckets: []float64{0.1, 0.5, 1, 2, 5, 8, 10, 20, 30},
}),
Expand All @@ -176,6 +188,8 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
return r
}

var handlerCount atomic.Int32

type server struct {
svc ContentRouter
disableNDJSON bool
Expand Down

0 comments on commit cdefbf2

Please sign in to comment.