Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ packages:
filename: external/hstore.go
github.com/evstack/ev-node/block/internal/syncing:
interfaces:
daRetriever:
DARetriever:
config:
dir: ./block/internal/syncing
pkgname: syncing
filename: syncer_mock.go
filename: da_retriever_mock.go
p2pHandler:
config:
dir: ./block/internal/syncing
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Rename `evm-single` to `evm` and `grpc-single` to `evgrpc` for clarity. [#2839](https://github.com/evstack/ev-node/pull/2839)
- Split cache interface in `CacheManager` and `PendingManager` and create `da` client to easy DA handling. [#2878](https://github.com/evstack/ev-node/pull/2878)

## v1.0.0-beta.10

Expand Down
13 changes: 8 additions & 5 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ func NewSyncComponents(
return nil, fmt.Errorf("failed to create cache manager: %w", err)
}

daClient := NewDAClient(da, config, logger)

// error channel for critical failures
errorCh := make(chan error, 1)

syncer := syncing.NewSyncer(
store,
exec,
da,
daClient,
cacheManager,
metrics,
config,
Expand All @@ -162,8 +164,8 @@ func NewSyncComponents(
errorCh,
)

// Create DA submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
// Create submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down Expand Up @@ -243,8 +245,9 @@ func NewAggregatorComponents(
return nil, fmt.Errorf("failed to create reaper: %w", err)
}

// Create DA submitter for aggregator nodes (with signer for submission)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
// Create DA client and submitter for aggregator nodes (with signer for submission)
daClient := NewDAClient(da, config, logger)
daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down
81 changes: 71 additions & 10 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func registerGobTypes() {
})
}

// Manager provides centralized cache management for both executing and syncing components
type Manager interface {
// CacheManager provides centralized cache management for both executing and syncing components
type CacheManager interface {
// Header operations
IsHeaderSeen(hash string) bool
SetHeaderSeen(hash string, blockHeight uint64)
Expand All @@ -62,14 +62,6 @@ type Manager interface {
SetTxSeen(hash string)
CleanupOldTxs(olderThan time.Duration) int

// Pending operations
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
NumPendingHeaders() uint64
NumPendingData() uint64

// Pending events syncing coordination
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
Expand All @@ -83,6 +75,22 @@ type Manager interface {
DeleteHeight(blockHeight uint64)
}

// PendingManager provides operations for managing pending headers and data
type PendingManager interface {
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
NumPendingHeaders() uint64
NumPendingData() uint64
}

// Manager provides centralized cache management for both executing and syncing components
type Manager interface {
CacheManager
PendingManager
}

var _ Manager = (*implementation)(nil)

// implementation provides the concrete implementation of cache Manager
Expand All @@ -98,6 +106,59 @@ type implementation struct {
logger zerolog.Logger
}

// NewPendingManager creates a new pending manager instance
func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager, error) {
pendingHeaders, err := NewPendingHeaders(store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create pending headers: %w", err)
}

pendingData, err := NewPendingData(store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create pending data: %w", err)
}

return &implementation{
pendingHeaders: pendingHeaders,
pendingData: pendingData,
logger: logger,
}, nil
}

// NewCacheManager creates a new cache manager instance
func NewCacheManager(cfg config.Config, logger zerolog.Logger) (CacheManager, error) {
// Initialize caches
headerCache := NewCache[types.SignedHeader]()
dataCache := NewCache[types.Data]()
txCache := NewCache[struct{}]()
pendingEventsCache := NewCache[common.DAHeightEvent]()

registerGobTypes()
impl := &implementation{
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
txTimestamps: new(sync.Map),
pendingEventsCache: pendingEventsCache,
config: cfg,
logger: logger,
}

if cfg.ClearCache {
// Clear the cache from disk
if err := impl.ClearFromDisk(); err != nil {
logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache")
}
} else {
// Load existing cache from disk
if err := impl.LoadFromDisk(); err != nil {
logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache")
}
}

return impl, nil
}

// NewManager creates a new cache manager instance
func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Manager, error) {
// Initialize caches
Expand Down
Loading
Loading