Skip to content

Commit

Permalink
cleanup options
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Mar 26, 2024
1 parent a596fde commit 3a7937b
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 91 deletions.
35 changes: 35 additions & 0 deletions gateway/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,46 @@ import (
"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/go-cid"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/routing"
)

type backendOptions struct {
ns namesys.NameSystem
vs routing.ValueStore
r resolver.Resolver
}

// WithNameSystem sets the name system to use with the different backends. If not set
// it will use the default DNSLink resolver generated by [NewDNSResolver] along
// with any configured [routing.ValueStore].
func WithNameSystem(ns namesys.NameSystem) BackendOption {
return func(opts *backendOptions) error {
opts.ns = ns
return nil
}
}

// WithValueStore sets the [routing.ValueStore] to use with the different backends.
func WithValueStore(vs routing.ValueStore) BackendOption {
return func(opts *backendOptions) error {
opts.vs = vs
return nil
}
}

// WithResolver sets the [resolver.Resolver] to use with the different backends.
func WithResolver(r resolver.Resolver) BackendOption {
return func(opts *backendOptions) error {
opts.r = r
return nil
}
}

type BackendOption func(options *backendOptions) error

// baseBackend contains some common backend functionalities that are shared by
// different backend implementations.
type baseBackend struct {
Expand Down
50 changes: 2 additions & 48 deletions gateway/backend_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ipfs/boxo/ipld/merkledag"
ufile "github.com/ipfs/boxo/ipld/unixfs/file"
uio "github.com/ipfs/boxo/ipld/unixfs/io"
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/path/resolver"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -35,9 +34,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p/core/routing"
mc "github.com/multiformats/go-multicodec"
"github.com/prometheus/client_golang/prometheus"

// Ensure basic codecs are registered.
_ "github.com/ipld/go-ipld-prime/codec/cbor"
Expand All @@ -57,51 +54,8 @@ type BlocksBackend struct {

var _ IPFSBackend = (*BlocksBackend)(nil)

type blocksBackendOptions struct {
ns namesys.NameSystem
vs routing.ValueStore
r resolver.Resolver
promRegistry prometheus.Registerer
}

// WithNameSystem sets the name system to use with the [BlocksBackend]. If not set
// it will use the default DNSLink resolver generated by [NewDNSResolver] along
// with any configured [routing.ValueStore].
func WithNameSystem(ns namesys.NameSystem) BlocksBackendOption {
return func(opts *blocksBackendOptions) error {
opts.ns = ns
return nil
}
}

// WithValueStore sets the [routing.ValueStore] to use with the [BlocksBackend].
func WithValueStore(vs routing.ValueStore) BlocksBackendOption {
return func(opts *blocksBackendOptions) error {
opts.vs = vs
return nil
}
}

// WithResolver sets the [resolver.Resolver] to use with the [BlocksBackend].
func WithResolver(r resolver.Resolver) BlocksBackendOption {
return func(opts *blocksBackendOptions) error {
opts.r = r
return nil
}
}

// WithPrometheusRegistry sets the registry to use for metrics collection.
func WithPrometheusRegistry(reg prometheus.Registerer) BlocksBackendOption {
return func(opts *blocksBackendOptions) error {
opts.promRegistry = reg
return nil
}
}

type BlocksBackendOption func(options *blocksBackendOptions) error

func NewBlocksBackend(blockService blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) {
var compiledOptions blocksBackendOptions
func NewBlocksBackend(blockService blockservice.BlockService, opts ...BackendOption) (*BlocksBackend, error) {
var compiledOptions backendOptions
for _, o := range opts {
if err := o(&compiledOptions); err != nil {
return nil, err
Expand Down
55 changes: 25 additions & 30 deletions gateway/backend_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ type CarFetcher interface {
Fetch(ctx context.Context, path string, cb DataCallback) error
}

type GraphGateway struct {
type GraphBackend struct {
baseBackend
fetcher CarFetcher

pc traversal.LinkTargetNodePrototypeChooser

metrics *GraphGatewayMetrics
metrics *GraphBackendMetrics
}

type GraphGatewayMetrics struct {
type GraphBackendMetrics struct {
contextAlreadyCancelledMetric prometheus.Counter
carFetchAttemptMetric prometheus.Counter
carBlocksFetchedMetric prometheus.Counter
Expand All @@ -65,8 +65,8 @@ type GraphGatewayMetrics struct {
bytesRangeSizeMetric prometheus.Histogram
}

func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGateway, error) {
var compiledOptions blocksBackendOptions
func NewGraphBackend(f CarFetcher, opts ...BackendOption) (*GraphBackend, error) {
var compiledOptions backendOptions
for _, o := range opts {
if err := o(&compiledOptions); err != nil {
return nil, err
Expand All @@ -80,15 +80,10 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGa
return nil, err
}

var promReg prometheus.Registerer = prometheus.NewRegistry()
if compiledOptions.promRegistry != nil {
promReg = compiledOptions.promRegistry
}

return &GraphGateway{
return &GraphBackend{
baseBackend: baseBackend,
fetcher: f,
metrics: registerGraphGatewayMetrics(promReg),
metrics: registerGraphBackendMetrics(),
pc: dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
Expand All @@ -98,7 +93,7 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGa
}, nil
}

func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGatewayMetrics {
func registerGraphBackendMetrics() *GraphBackendMetrics {
// How many CAR Fetch attempts we had? Need this to calculate % of various graph request types.
// We only count attempts here, because success/failure with/without retries are provided by caboose:
// - ipfs_caboose_fetch_duration_car_success_count
Expand All @@ -111,15 +106,15 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway
Name: "car_fetch_attempts",
Help: "The number of times a CAR fetch was attempted by IPFSBackend.",
})
registerer.MustRegister(carFetchAttemptMetric)
prometheus.MustRegister(carFetchAttemptMetric)

contextAlreadyCancelledMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "car_fetch_context_already_cancelled",
Help: "The number of times context is already cancelled when a CAR fetch was attempted by IPFSBackend.",
})
registerer.MustRegister(contextAlreadyCancelledMetric)
prometheus.MustRegister(contextAlreadyCancelledMetric)

// How many blocks were read via CARs?
// Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts.
Expand All @@ -129,15 +124,15 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway
Name: "car_blocks_fetched",
Help: "The number of blocks successfully read via CAR fetch.",
})
registerer.MustRegister(carBlocksFetchedMetric)
prometheus.MustRegister(carBlocksFetchedMetric)

carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "car_fetch_params",
Help: "How many times specific CAR parameter was used during CAR data fetch.",
}, []string{"dagScope", "entityRanges"}) // we use 'ranges' instead of 'bytes' here because we only count the number of ranges present
registerer.MustRegister(carParamsMetric)
prometheus.MustRegister(carParamsMetric)

bytesRangeStartMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -146,7 +141,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway
Help: "Tracks where did the range request start.",
Buckets: prometheus.ExponentialBuckets(1024, 2, 24), // 1024 bytes to 8 GiB
})
registerer.MustRegister(bytesRangeStartMetric)
prometheus.MustRegister(bytesRangeStartMetric)

bytesRangeSizeMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -155,9 +150,9 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway
Help: "Tracks the size of range requests.",
Buckets: prometheus.ExponentialBuckets(256*1024, 2, 10), // From 256KiB to 100MiB
})
registerer.MustRegister(bytesRangeSizeMetric)
prometheus.MustRegister(bytesRangeSizeMetric)

return &GraphGatewayMetrics{
return &GraphBackendMetrics{
contextAlreadyCancelledMetric,
carFetchAttemptMetric,
carBlocksFetchedMetric,
Expand All @@ -167,7 +162,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway
}
}

func (api *GraphGateway) fetchCAR(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error {
func (api *GraphBackend) fetchCAR(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error {
urlWithoutHost := contentPathToCarUrl(path, params).String()

api.metrics.carFetchAttemptMetric.Inc()
Expand Down Expand Up @@ -305,7 +300,7 @@ func contentMetadataFromRootsAndRemainder(p path.ImmutablePath, pathRoots []cid.

var errNotUnixFS = fmt.Errorf("data was not unixfs")

func (api *GraphGateway) Get(ctx context.Context, path path.ImmutablePath, byteRanges ...ByteRange) (ContentPathMetadata, *GetResponse, error) {
func (api *GraphBackend) Get(ctx context.Context, path path.ImmutablePath, byteRanges ...ByteRange) (ContentPathMetadata, *GetResponse, error) {
rangeCount := len(byteRanges)
api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": strconv.Itoa(rangeCount)}).Inc()

Expand Down Expand Up @@ -681,7 +676,7 @@ func (it *backpressuredHAMTDirIterNoRecursion) Err() error {

var _ AwaitCloser = (*backpressuredHAMTDirIterNoRecursion)(nil)

func (api *GraphGateway) GetAll(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.Node, error) {
func (api *GraphBackend) GetAll(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.Node, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc()
return fetchWithPartialRetries(ctx, path, CarParams{Scope: DagScopeAll}, loadTerminalUnixFSElementWithRecursiveDirectories, api.metrics, api.fetchCAR)
}
Expand All @@ -700,7 +695,7 @@ type nextReq struct {
params CarParams
}

func fetchWithPartialRetries[T any](ctx context.Context, p path.ImmutablePath, initialParams CarParams, resolveTerminalElementFn loadTerminalElement[T], metrics *GraphGatewayMetrics, fetchCAR fetchCarFn) (ContentPathMetadata, T, error) {
func fetchWithPartialRetries[T any](ctx context.Context, p path.ImmutablePath, initialParams CarParams, resolveTerminalElementFn loadTerminalElement[T], metrics *GraphBackendMetrics, fetchCAR fetchCarFn) (ContentPathMetadata, T, error) {
var zeroReturnType T

terminalPathElementCh := make(chan terminalPathType[T], 1)
Expand Down Expand Up @@ -848,7 +843,7 @@ func fetchWithPartialRetries[T any](ctx context.Context, p path.ImmutablePath, i
}
}

func (api *GraphGateway) GetBlock(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, files.File, error) {
func (api *GraphBackend) GetBlock(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, files.File, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc()

var md ContentPathMetadata
Expand Down Expand Up @@ -895,7 +890,7 @@ func (api *GraphGateway) GetBlock(ctx context.Context, p path.ImmutablePath) (Co
return md, f, nil
}

func (api *GraphGateway) Head(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, *HeadResponse, error) {
func (api *GraphBackend) Head(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, *HeadResponse, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": "1"}).Inc()

// TODO: we probably want to move this either to boxo, or at least to loadRequestIntoSharedBlockstoreAndBlocksGateway
Expand Down Expand Up @@ -1037,7 +1032,7 @@ func (api *GraphGateway) Head(ctx context.Context, p path.ImmutablePath) (Conten
return md, n, nil
}

func (api *GraphGateway) ResolvePath(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, error) {
func (api *GraphBackend) ResolvePath(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc()

var md ContentPathMetadata
Expand Down Expand Up @@ -1066,7 +1061,7 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, p path.ImmutablePath)
return md, nil
}

func (api *GraphGateway) GetCAR(ctx context.Context, p path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) {
func (api *GraphBackend) GetCAR(ctx context.Context, p path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) {
numRanges := "0"
if params.Range != nil {
numRanges = "1"
Expand Down Expand Up @@ -1175,11 +1170,11 @@ func getRootCid(imPath path.ImmutablePath) (cid.Cid, error) {
return rootCid, nil
}

func (api *GraphGateway) IsCached(ctx context.Context, path path.Path) bool {
func (api *GraphBackend) IsCached(ctx context.Context, path path.Path) bool {
return false
}

var _ IPFSBackend = (*GraphGateway)(nil)
var _ IPFSBackend = (*GraphBackend)(nil)

func checkRetryableError(e *error, fn func() error) error {
err := fn()
Expand Down
Loading

0 comments on commit 3a7937b

Please sign in to comment.