Skip to content

Commit

Permalink
feat(taiko-client): improve progress tracker (#17281)
Browse files Browse the repository at this point in the history
Co-authored-by: maskpp <[email protected]>
  • Loading branch information
davidtaikocha and mask-pp authored May 26, 2024
1 parent 4011d3e commit 5d05226
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

var (
syncProgressCheckInterval = 12 * time.Second
gapToResync = new(big.Int).SetUint64(64)
)

// SyncProgressTracker is responsible for tracking the L2 execution engine's sync progress, after
Expand Down Expand Up @@ -162,16 +163,34 @@ func (t *SyncProgressTracker) ClearMeta() {
t.outOfSync = false
}

// HeadChanged checks if a new beacon sync request will be needed.
func (t *SyncProgressTracker) HeadChanged(newID *big.Int) bool {
// NeedReSync checks if a new beacon sync request will be needed:
// 1, if the beacon sync has not been triggered yet
// 2, if there is 64 blocks gap between the last head to sync and the new block
// 3, if the last triggered beacon sync is finished, but there are still new blocks
func (t *SyncProgressTracker) NeedReSync(newID *big.Int) bool {
t.mutex.RLock()
defer t.mutex.RUnlock()

// If the beacon sync has not been triggered yet, we will simply trigger it.
if !t.triggered {
return true
}

return t.lastSyncedBlockID != nil && t.lastSyncedBlockID != newID
if t.lastSyncedBlockID == nil {
return true
}

// If the new block is 64 blocks ahead of the last synced block, we will trigger a new beacon sync.
if new(big.Int).Sub(newID, t.lastSyncedBlockID).Cmp(gapToResync) >= 0 {
return true
}

// If the last triggered beacon sync is finished, we will trigger a new beacon sync.
if t.lastSyncProgress != nil && t.lastSyncProgress.CurrentBlock >= t.lastSyncedBlockID.Uint64() {
return true
}

return false
}

// OutOfSync tells whether the L2 execution engine is marked as out of sync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (s *BeaconSyncProgressTrackerTestSuite) TestClearMeta() {
}

func (s *BeaconSyncProgressTrackerTestSuite) TestHeadChanged() {
s.True(s.t.HeadChanged(common.Big256))
s.True(s.t.NeedReSync(common.Big256))
s.t.triggered = true
s.False(s.t.HeadChanged(common.Big256))
s.False(s.t.NeedReSync(common.Big256))
}

func (s *BeaconSyncProgressTrackerTestSuite) TestOutOfSync() {
Expand Down
43 changes: 21 additions & 22 deletions packages/taiko-client/driver/chain_syncer/beaconsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,25 @@ func NewSyncer(
// TriggerBeaconSync triggers the L2 execution engine to start performing a beacon sync, if the
// latest verified block has changed.
func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
latestVerifiedHeadPayload, err := s.getVerifiedBlockPayload(s.ctx, blockID)
if err != nil {
return err
// If we don't need to trigger another beacon sync, just return.
if !s.progressTracker.NeedReSync(new(big.Int).SetUint64(blockID)) {
return nil
}

if !s.progressTracker.HeadChanged(new(big.Int).SetUint64(blockID)) {
log.Debug("Verified head has not changed", "blockID", blockID, "hash", latestVerifiedHeadPayload.BlockHash)
return nil
if s.progressTracker.Triggered() && s.progressTracker.lastSyncProgress == nil {
log.Info(
"Syncing beacon headers, please check L2 execution engine logs for progress",
"currentSyncHead", s.progressTracker.LastSyncedBlockID(),
"newBlockID", blockID,
)
}

if s.progressTracker.Triggered() {
if s.progressTracker.lastSyncProgress == nil {
log.Info(
"Syncing beacon headers, please check L2 execution engine logs for progress",
"currentSyncHead", s.progressTracker.LastSyncedBlockID(),
"newBlockID", blockID,
)
}
headPayload, err := s.getBlockPayload(s.ctx, blockID)
if err != nil {
return err
}

status, err := s.rpc.L2Engine.NewPayload(s.ctx, latestVerifiedHeadPayload)
status, err := s.rpc.L2Engine.NewPayload(s.ctx, headPayload)
if err != nil {
return err
}
Expand All @@ -69,9 +67,9 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
}

fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(s.ctx, &engine.ForkchoiceStateV1{
HeadBlockHash: latestVerifiedHeadPayload.BlockHash,
SafeBlockHash: latestVerifiedHeadPayload.BlockHash,
FinalizedBlockHash: latestVerifiedHeadPayload.BlockHash,
HeadBlockHash: headPayload.BlockHash,
SafeBlockHash: headPayload.BlockHash,
FinalizedBlockHash: headPayload.BlockHash,
}, nil)
if err != nil {
return err
Expand All @@ -81,7 +79,7 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
}

// Update sync status.
s.progressTracker.UpdateMeta(new(big.Int).SetUint64(blockID), latestVerifiedHeadPayload.BlockHash)
s.progressTracker.UpdateMeta(new(big.Int).SetUint64(blockID), headPayload.BlockHash)

log.Info(
"⛓️ Beacon sync triggered",
Expand All @@ -92,14 +90,15 @@ func (s *Syncer) TriggerBeaconSync(blockID uint64) error {
return nil
}

// getVerifiedBlockPayload fetches the latest verified block's header, and converts it to an Engine API executable data,
// getBlockPayload fetches the block's header, and converts it to an Engine API executable data,
// which will be used to let the node start beacon syncing.
func (s *Syncer) getVerifiedBlockPayload(ctx context.Context, blockID uint64) (*engine.ExecutableData, error) {
func (s *Syncer) getBlockPayload(ctx context.Context, blockID uint64) (*engine.ExecutableData, error) {
header, err := s.rpc.L2CheckPoint.HeaderByNumber(s.ctx, new(big.Int).SetUint64(blockID))
if err != nil {
return nil, err
}

// If the sync mode is `full`, we need to verify the protocol verified block hash before syncing.
if s.syncMode == downloader.FullSync.String() {
blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(blockID))
if err != nil {
Expand All @@ -118,7 +117,7 @@ func (s *Syncer) getVerifiedBlockPayload(ctx context.Context, blockID uint64) (*
}
}

log.Info("Latest verified block header retrieved", "hash", header.Hash())
log.Info("Block header to sync retrieved", "hash", header.Hash())

return encoding.ToExecutableData(header), nil
}
36 changes: 4 additions & 32 deletions packages/taiko-client/driver/chain_syncer/blob/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,10 @@ func (s *Syncer) insertNewHead(
return nil, fmt.Errorf("failed to create execution payloads: %w", err)
}

fc := &engine.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash}
if err = s.fillForkchoiceState(ctx, event, fc); err != nil {
return nil, err
fc := &engine.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.BlockHash,
FinalizedBlockHash: payload.BlockHash,
}

// Update the fork choice
Expand All @@ -402,35 +403,6 @@ func (s *Syncer) insertNewHead(
return payload, nil
}

// fillForkchoiceState fills the forkchoice state with the finalized block hash and the safe block hash.
func (s *Syncer) fillForkchoiceState(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
fc *engine.ForkchoiceStateV1,
) error {
// If the event is emitted from the genesis block, we don't need to fill the forkchoice state,
// should only happen when testing.
if event.Raw.BlockNumber == 0 {
return nil
}

// Fetch the latest verified block's header from protocol.
variables, err := s.rpc.GetTaikoDataSlotBByNumber(ctx, event.Raw.BlockNumber)
if err != nil {
return err
}
finalizeHeader, err := s.rpc.L2.HeaderByNumber(ctx, new(big.Int).SetUint64(variables.LastVerifiedBlockId))
if err != nil {
return err
}

// Fill the forkchoice state.
fc.FinalizedBlockHash = finalizeHeader.Hash()
fc.SafeBlockHash = finalizeHeader.ParentHash

return nil
}

// createExecutionPayloads creates a new execution payloads through
// Engine APIs.
func (s *Syncer) createExecutionPayloads(
Expand Down
34 changes: 18 additions & 16 deletions packages/taiko-client/driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func New(

// Sync performs a sync operation to L2 execution engine's local chain.
func (s *L2ChainSyncer) Sync() error {
blockID, needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered()
blockIDToSync, needNewBeaconSyncTriggered, err := s.needNewBeaconSyncTriggered()
if err != nil {
return err
}
// If current L2 execution engine's chain is behind of the protocol's latest verified block head, and the
// If current L2 execution engine's chain is behind of the block head to sync, and the
// `P2PSync` flag is set, try triggering a beacon sync in L2 execution engine to catch up the
// latest verified block head.
// head.
if needNewBeaconSyncTriggered {
if err := s.beaconSyncer.TriggerBeaconSync(blockID); err != nil {
if err := s.beaconSyncer.TriggerBeaconSync(blockIDToSync); err != nil {
return fmt.Errorf("trigger beacon sync error: %w", err)
}

Expand Down Expand Up @@ -138,27 +138,29 @@ func (s *L2ChainSyncer) Sync() error {
return s.blobSyncer.ProcessL1Blocks(s.ctx)
}

// AheadOfProtocolVerifiedHead checks whether the L2 chain is ahead of verified head in protocol.
func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint64) bool {
// AheadOfHeadToSync checks whether the L2 chain is ahead of the head to sync in protocol.
func (s *L2ChainSyncer) AheadOfHeadToSync(heightToSync uint64) bool {
log.Debug(
"Checking whether the execution engine is ahead of protocol's verified head",
"latestVerifiedBlock", verifiedHeightToCompare,
"Checking whether the execution engine is ahead of the head to sync",
"heightToSync", heightToSync,
"executionEngineHead", s.state.GetL2Head().Number,
)
if verifiedHeightToCompare > 0 {
// If latest verified head height is equal to L2 execution engine's synced head height minus one,
if heightToSync > 0 {
// If head height is equal to L2 execution engine's synced head height minus one,
// we also mark the triggered P2P sync progress as finished to prevent a potential `InsertBlockWithoutSetHead` in
// execution engine, which may cause errors since we do not pass all transactions in ExecutePayload when calling
// `NewPayloadV1`.
verifiedHeightToCompare--
heightToSync--
}

// If the L2 execution engine's chain is behind of the protocol's latest verified block head,
// If the L2 execution engine's chain is behind of the block head to sync,
// we should keep the beacon sync.
if s.state.GetL2Head().Number.Uint64() < verifiedHeightToCompare {
if s.state.GetL2Head().Number.Uint64() < heightToSync {
return false
}

// If the L2 execution engine's chain is ahead of the block head to sync,
// we can mark the beacon sync progress as finished.
if s.progressTracker.LastSyncedBlockID() != nil {
return s.state.GetL2Head().Number.Uint64() >= s.progressTracker.LastSyncedBlockID().Uint64()
}
Expand All @@ -171,15 +173,15 @@ func (s *L2ChainSyncer) AheadOfProtocolVerifiedHead(verifiedHeightToCompare uint
// 1. The `P2PSync` flag is set.
// 2. The protocol's latest verified block head is not zero.
// 3. The L2 execution engine's chain is behind of the protocol's latest verified block head.
// 4. The L2 execution engine's chain have met a sync timeout issue.
// 4. The L2 execution engine's chain has met a sync timeout issue.
func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (uint64, bool, error) {
// If the flag is not set or there was a finished beacon sync, we simply return false.
if !s.p2pSync || s.progressTracker.Finished() {
return 0, false, nil
}

// For full sync mode, we will use the verified block head,
// And for snap sync mode, we will use the latest block head.
// and for snap sync mode, we will use the latest block head.
var (
blockID uint64
err error
Expand All @@ -204,7 +206,7 @@ func (s *L2ChainSyncer) needNewBeaconSyncTriggered() (uint64, bool, error) {
return 0, false, nil
}

return blockID, !s.AheadOfProtocolVerifiedHead(blockID) &&
return blockID, !s.AheadOfHeadToSync(blockID) &&
!s.progressTracker.OutOfSync(), nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ func (s *ChainSyncerTestSuite) RevertSnapshot() {
}

func (s *ChainSyncerTestSuite) TestAheadOfProtocolVerifiedHead() {
s.True(s.s.AheadOfProtocolVerifiedHead(0))
s.True(s.s.AheadOfHeadToSync(0))
}

0 comments on commit 5d05226

Please sign in to comment.