Skip to content

Commit 837a887

Browse files
committed
Merge branch 'main' into alex/cache_io
* main: refactor: use state da height as well (#2872) refactor: retrieve highest da height in cache (#2870) chore: change from event count to start and end height (#2871)
2 parents 64b81ec + 9a5eba1 commit 837a887

File tree

7 files changed

+93
-10
lines changed

7 files changed

+93
-10
lines changed

block/internal/cache/generic_cache.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111

1212
"golang.org/x/sync/errgroup"
13+
"sync/atomic"
1314
)
1415

1516
// Cache is a generic cache that maintains items that are seen and hard confirmed
@@ -22,6 +23,8 @@ type Cache[T any] struct {
2223
daIncluded *sync.Map
2324
// hashByHeight tracks the hash associated with each height for pruning
2425
hashByHeight *sync.Map
26+
// maxDAHeight tracks the maximum DA height seen
27+
maxDAHeight *atomic.Uint64
2528
}
2629

2730
// NewCache returns a new Cache struct
@@ -31,6 +34,7 @@ func NewCache[T any]() *Cache[T] {
3134
hashes: new(sync.Map),
3235
daIncluded: new(sync.Map),
3336
hashByHeight: new(sync.Map),
37+
maxDAHeight: &atomic.Uint64{},
3438
}
3539
}
3640

@@ -95,13 +99,25 @@ func (c *Cache[T]) getDAIncluded(hash string) (uint64, bool) {
9599
func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) {
96100
c.daIncluded.Store(hash, daHeight)
97101
c.hashByHeight.Store(blockHeight, hash)
102+
103+
// Update max DA height if necessary
104+
current := c.maxDAHeight.Load()
105+
if daHeight >= current {
106+
_ = c.maxDAHeight.CompareAndSwap(current, daHeight)
107+
}
98108
}
99109

100110
// removeDAIncluded removes the DA-included status of the hash
101111
func (c *Cache[T]) removeDAIncluded(hash string) {
102112
c.daIncluded.Delete(hash)
103113
}
104114

115+
// daHeight returns the maximum DA height from all DA-included items.
116+
// Returns 0 if no items are DA-included.
117+
func (c *Cache[T]) daHeight() uint64 {
118+
return c.maxDAHeight.Load()
119+
}
120+
105121
// deleteAllForHeight removes all items and their associated data from the cache at the given height
106122
func (c *Cache[T]) deleteAllForHeight(height uint64) {
107123
c.itemsByHeight.Delete(height)
@@ -255,8 +271,12 @@ func (c *Cache[T]) LoadFromDisk(folderPath string) error {
255271
}
256272
for k, v := range daIncludedMap {
257273
c.daIncluded.Store(k, v)
274+
// Update max DA height during load
275+
current := c.maxDAHeight.Load()
276+
if v > current {
277+
_ = c.maxDAHeight.CompareAndSwap(current, v)
278+
}
258279
}
259-
260280
return nil
261281
})
262282
return wg.Wait()

block/internal/cache/generic_cache_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,47 @@ func init() {
1414
gob.Register(&testItem{})
1515
}
1616

17+
// TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height
18+
func TestCache_MaxDAHeight(t *testing.T) {
19+
c := NewCache[testItem]()
20+
21+
// Initially should be 0
22+
if got := c.daHeight(); got != 0 {
23+
t.Errorf("initial daHeight = %d, want 0", got)
24+
}
25+
26+
// Set items with increasing DA heights
27+
c.setDAIncluded("hash1", 100, 1)
28+
if got := c.daHeight(); got != 100 {
29+
t.Errorf("after setDAIncluded(100): daHeight = %d, want 100", got)
30+
}
31+
32+
c.setDAIncluded("hash2", 50, 2) // Lower height shouldn't change max
33+
if got := c.daHeight(); got != 100 {
34+
t.Errorf("after setDAIncluded(50): daHeight = %d, want 100", got)
35+
}
36+
37+
c.setDAIncluded("hash3", 200, 3)
38+
if got := c.daHeight(); got != 200 {
39+
t.Errorf("after setDAIncluded(200): daHeight = %d, want 200", got)
40+
}
41+
42+
// Test persistence
43+
dir := t.TempDir()
44+
if err := c.SaveToDisk(dir); err != nil {
45+
t.Fatalf("SaveToDisk failed: %v", err)
46+
}
47+
48+
c2 := NewCache[testItem]()
49+
if err := c2.LoadFromDisk(dir); err != nil {
50+
t.Fatalf("LoadFromDisk failed: %v", err)
51+
}
52+
53+
if got := c2.daHeight(); got != 200 {
54+
t.Errorf("after load: daHeight = %d, want 200", got)
55+
}
56+
}
57+
1758
// TestCache_SaveLoad_ErrorPaths covers SaveToDisk and LoadFromDisk error scenarios.
1859
func TestCache_SaveLoad_ErrorPaths(t *testing.T) {
1960
c := NewCache[testItem]()

block/internal/cache/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Manager interface {
4949
GetHeaderDAIncluded(hash string) (uint64, bool)
5050
SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64)
5151
RemoveHeaderDAIncluded(hash string)
52+
DaHeight() uint64
5253

5354
// Data operations
5455
IsDataSeen(hash string) bool
@@ -165,6 +166,11 @@ func (m *implementation) RemoveHeaderDAIncluded(hash string) {
165166
m.headerCache.removeDAIncluded(hash)
166167
}
167168

169+
// DaHeight fetches the heights da height contained in the processed cache.
170+
func (m *implementation) DaHeight() uint64 {
171+
return max(m.headerCache.daHeight(), m.dataCache.daHeight())
172+
}
173+
168174
// Data operations
169175
func (m *implementation) IsDataSeen(hash string) bool {
170176
return m.dataCache.isSeen(hash)

block/internal/syncing/da_retriever.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,18 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
214214
}
215215

216216
if len(events) > 0 {
217-
r.logger.Info().Int("count", len(events)).Uint64("da_height", daHeight).Msg("processed blocks from DA")
217+
startHeight := events[0].Header.Height()
218+
endHeight := events[0].Header.Height()
219+
for _, event := range events {
220+
h := event.Header.Height()
221+
if h < startHeight {
222+
startHeight = h
223+
}
224+
if h > endHeight {
225+
endHeight = h
226+
}
227+
}
228+
r.logger.Info().Uint64("da_height", daHeight).Uint64("start_height", startHeight).Uint64("end_height", endHeight).Msg("processed blocks from DA")
218229
}
219230

220231
return events

block/internal/syncing/syncer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (s *Syncer) SetLastState(state types.State) {
177177

178178
// GetDAHeight returns the current DA height
179179
func (s *Syncer) GetDAHeight() uint64 {
180-
return s.daHeight.Load()
180+
return max(s.daHeight.Load(), s.cache.DaHeight())
181181
}
182182

183183
// SetDAHeight updates the DA height
@@ -217,7 +217,9 @@ func (s *Syncer) initializeState() error {
217217
s.SetLastState(state)
218218

219219
// Set DA height
220-
s.SetDAHeight(state.DAHeight)
220+
// we get the max from the genesis da height, the state da height and the cache (fetched) da height
221+
// if a user has messed up and sync da too far ahead, on restart they can clear the cache (--clear-cache) and the retrieve will restart fetching from the last known block synced and executed from DA or the set genesis da height.
222+
s.SetDAHeight(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
221223

222224
s.logger.Info().
223225
Uint64("height", state.LastBlockHeight).

block/internal/syncing/syncer_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,6 @@ func TestSyncLoopPersistState(t *testing.T) {
433433

434434
t.Log("sync workers on instance1 completed")
435435
require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight())
436-
lastStateDAHeight := syncerInst1.GetLastState().DAHeight
437436

438437
// wait for all events consumed
439438
require.NoError(t, cacheMgr.SaveToDisk())
@@ -470,7 +469,6 @@ func TestSyncLoopPersistState(t *testing.T) {
470469
make(chan error, 1),
471470
)
472471
require.NoError(t, syncerInst2.initializeState())
473-
require.Equal(t, lastStateDAHeight, syncerInst2.GetDAHeight())
474472

475473
ctx, cancel = context.WithCancel(t.Context())
476474
t.Cleanup(cancel)
@@ -484,7 +482,7 @@ func TestSyncLoopPersistState(t *testing.T) {
484482
Run(func(arg mock.Arguments) {
485483
cancel()
486484
// retrieve last one again
487-
assert.Equal(t, lastStateDAHeight, arg.Get(1).(uint64))
485+
assert.Equal(t, syncerInst2.GetDAHeight(), arg.Get(1).(uint64))
488486
}).
489487
Return(nil, nil)
490488

@@ -589,6 +587,11 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
589587
// This test verifies that initializeState() invokes Replayer.
590588
// The detailed replay logic is tested in block/internal/common/replay_test.go
591589

590+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
591+
st := store.New(ds)
592+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
593+
require.NoError(t, err)
594+
592595
// Create mocks
593596
mockStore := testmocks.NewMockStore(t)
594597
mockExec := testmocks.NewMockHeightAwareExecutor(t)
@@ -627,17 +630,17 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
627630
daHeight: &atomic.Uint64{},
628631
logger: zerolog.Nop(),
629632
ctx: context.Background(),
633+
cache: cm,
630634
}
631635

632636
// Initialize state - this should call Replayer
633-
err := syncer.initializeState()
637+
err = syncer.initializeState()
634638
require.NoError(t, err)
635639

636640
// Verify state was initialized correctly
637641
state := syncer.GetLastState()
638642
assert.Equal(t, storeHeight, state.LastBlockHeight)
639643
assert.Equal(t, gen.ChainID, state.ChainID)
640-
assert.Equal(t, uint64(5), syncer.GetDAHeight())
641644

642645
// Verify that GetLatestHeight was called (proves Replayer was invoked)
643646
mockExec.AssertCalled(t, "GetLatestHeight", mock.Anything)

tools/da-debug/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ da-debug search 100 "0x000000000000000000000000000000000000000000000000000000746
7474
All commands support these global flags:
7575

7676
<!-- markdown-link-check-disable -->
77-
- `--da-url string`: DA layer JSON-RPC URL (default: "http://localhost:7980")
77+
- `--da-url string`: DA layer JSON-RPC URL (default: "<http://localhost:7980>")
7878
<!-- markdown-link-check-enable -->
7979
- `--auth-token string`: Authentication token for DA layer
8080
- `--timeout duration`: Request timeout (default: 30s)

0 commit comments

Comments
 (0)