Skip to content

Commit

Permalink
share/availability/full;nodebuilder/pruner: Implement archival trimming
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Jan 9, 2025
1 parent 4d3d3c2 commit 9bf0217
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 127 deletions.
85 changes: 30 additions & 55 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
Expand All @@ -23,12 +20,6 @@ import (
var log = logging.Logger("module/pruner")

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
Expand All @@ -42,51 +33,56 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Invoke(func(ctx context.Context, ds datastore.Batching, p pruner.Pruner) error {
return pruner.DetectPreviousRun(ctx, ds, p.Kind())
}),
)

baseComponents := fx.Options(
fx.Supply(cfg),
// TODO @renaynay: move this to share module construction
fx.Supply(modshare.Window(availability.StorageWindow)),
advertiseArchival(tp, cfg),
prunerService,
)

switch tp {
case node.Light:
// LNs enforce pruning by default
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
case node.Full:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
coreOpts := make([]core.Option, 0)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
coreOpts = []core.Option{core.WithArchivalMode()}
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
)
default:
panic("unknown node type")
Expand All @@ -102,24 +98,3 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
return opt
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return modshare.Window(availability.StorageWindow)
}
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
}
}
4 changes: 3 additions & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(full.NewShareAvailability),
fx.Provide(func(s *store.Store, getter shwap.Getter, opts []full.Option) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down
9 changes: 1 addition & 8 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build pruning || integration

package tests

import (
Expand Down Expand Up @@ -41,11 +39,6 @@ import (
// spin up 3 pruning FNs, connect
// spin up 1 LN that syncs historic blobs
func TestArchivalBlobSync(t *testing.T) {
if testing.Short() {
// TODO: https://github.com/celestiaorg/celestia-node/issues/3636
t.Skip()
}

const (
blocks = 50
btime = time.Millisecond * 300
Expand Down Expand Up @@ -181,7 +174,7 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

// Light nodes are allowed to disable pruning in wish
// Light nodes have pruning enabled by default
for _, nt := range []node.Type{node.Bridge, node.Full} {
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
Expand Down
20 changes: 0 additions & 20 deletions pruner/archival/pruner.go

This file was deleted.

42 changes: 31 additions & 11 deletions pruner/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,44 @@ var (
// checkpoint contains information related to the state of the
// pruner service that is periodically persisted to disk.
type checkpoint struct {
PrunerKind string `json:"pruner_kind"`
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

// DetectPreviousRun checks if the pruner has run before by checking for the existence of a
// checkpoint.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore) error {
_, err := getCheckpoint(ctx, namespace.Wrap(ds, storePrefix))
if errors.Is(err, errCheckpointNotFound) {
return nil
func newCheckpoint(prunerKind string) *checkpoint {
return &checkpoint{
PrunerKind: prunerKind,
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
}

// DetectPreviousRun ensures that a node that has been run with "full" pruning
// mode previously cannot revert back to an "archival" one. This check should
// only be performed when a node is either a Full or Bridge node.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore, expectedKind string) error {
wrappedDs := namespace.Wrap(ds, storePrefix)

cp, err := getCheckpoint(ctx, wrappedDs)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
return nil
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}
return ErrDisallowRevertToArchival

if cp.PrunerKind != expectedKind {
// do not allow reversion back to archival mode
if cp.PrunerKind == "full" {
return ErrDisallowRevertToArchival
}
// allow conversion from archival to full by overriding previous checkpoint
log.Infow("overriding checkpoint to enable full pruning mode...")
cp = newCheckpoint(expectedKind)
return storeCheckpoint(ctx, wrappedDs, cp)
}
return nil
}

// storeCheckpoint persists the checkpoint to disk.
Expand Down Expand Up @@ -78,10 +101,7 @@ func (s *Service) loadCheckpoint(ctx context.Context) error {
cp, err := getCheckpoint(ctx, s.ds)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
s.checkpoint = &checkpoint{
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
s.checkpoint = newCheckpoint(s.pruner.Kind())
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}
return err
Expand Down
63 changes: 63 additions & 0 deletions pruner/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStoreCheckpoint(t *testing.T) {
ctx := context.Background()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
c := &checkpoint{
PrunerKind: "test",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}
Expand All @@ -24,3 +27,63 @@ func TestStoreCheckpoint(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c, c2)
}

// TestDisallowRevertArchival tests that a node that has been previously run
// with full pruning cannot convert back into an "archival" node.
func TestDisallowRevertArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "full",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "archival")
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)

// ensure no false positives
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

// ensure checkpoint is retrievable after
cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
require.NotNil(t, cp)
assert.Equal(t, cp.LastPrunedHeight, c.LastPrunedHeight)
}

func TestCheckpointOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "archival",
LastPrunedHeight: 600,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
assert.Equal(t, "full", cp.PrunerKind)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
32 changes: 0 additions & 32 deletions pruner/full/pruner.go

This file was deleted.

1 change: 1 addition & 0 deletions pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ import (
// from the node's datastore.
type Pruner interface {
Prune(context.Context, *header.ExtendedHeader) error
Kind() string
}
4 changes: 4 additions & 0 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error {
return nil
}

func (mp *mockPruner) Kind() string {
return "mock"
}

// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility.
// https://github.com/celestiaorg/celestia-node/issues/3278.
type SpacedHeaderGenerator struct {
Expand Down
Loading

0 comments on commit 9bf0217

Please sign in to comment.