Skip to content

Commit

Permalink
feat(dot/sync): Implement warp sync strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
dimartiro committed Oct 22, 2024
1 parent 1ff0b25 commit 4a532cf
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 1 deletion.
42 changes: 42 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package messages
import (
"fmt"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/client/consensus/grandpa"
"github.com/ChainSafe/gossamer/internal/primitives/core/hash"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
)
Expand All @@ -15,6 +18,12 @@ type WarpProofRequest struct {
Begin common.Hash
}

func NewWarpProofRequest(from common.Hash) *WarpProofRequest {
return &WarpProofRequest{
Begin: from,
}
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
Expand All @@ -37,4 +46,37 @@ func (wpr *WarpProofRequest) String() string {
return fmt.Sprintf("WarpProofRequest begin=%v", wpr.Begin)
}

type WarpSyncFragment struct {
Header types.Header
Justification grandpa.GrandpaJustification[hash.H256, uint64]
}

type WarpSyncProof struct {
Proofs []WarpSyncFragment
// indicates whether the warp sync has been completed
IsFinished bool
proofsLength int
}

func (wsp *WarpSyncProof) Decode(in []byte) error {
return scale.Unmarshal(in, wsp)
}

func (wsp *WarpSyncProof) Encode() ([]byte, error) {
if wsp == nil {
return nil, fmt.Errorf("cannot encode nil WarpSyncProof")
}
return scale.Marshal(*wsp)
}

func (wsp *WarpSyncProof) String() string {
if wsp == nil {
return "WarpSyncProof=nil"
}

return fmt.Sprintf("WarpSyncProof proofs=%v isFinished=%v proofsLength=%v",
wsp.Proofs, wsp.IsFinished, wsp.proofsLength)
}

var _ P2PMessage = (*WarpSyncProof)(nil)
var _ P2PMessage = (*WarpProofRequest)(nil)
10 changes: 10 additions & 0 deletions dot/peerset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@ const (
// SameBlockSyncRequest used when a peer send us more than the max number of the same request.
SameBlockSyncRequest Reputation = math.MinInt32
SameBlockSyncRequestReason = "same block sync request"

// UnexpectedResponseValue is used when peer send an unexpected response.
UnexpectedResponseValue Reputation = -(1 << 29)
// UnexpectedResponseReason is used when peer send an unexpected response.
UnexpectedResponseReason = "Unexpected response"

// BadWarpProofValue is used when peer send invalid warp sync proof.
BadWarpProofValue Reputation = -(1 << 29)
// BadWarpProofReason is used when peer send invalid warp sync proof.
BadWarpProofReason = "Bad warp proof"
)
2 changes: 1 addition & 1 deletion dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (f *FullSyncStrategy) NextActions() ([]*SyncTask, error) {
}

// our best block is equal or ahead of current target.
// in the node's pov we are not legging behind so there's nothing to do
// in the node's pov we are not lagging behind so there's nothing to do
// or we didn't receive block announces, so lets ask for more blocks
if uint32(bestBlockHeader.Number) >= currentTarget { //nolint:gosec
return f.createTasks(reqsFromQueue), nil
Expand Down
1 change: 1 addition & 0 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (s *SyncService) runStrategy() {
s.currentStrategy.ShowMetrics()
logger.Trace("finish process to acquire more blocks")

// TODO: why not use s.currentStrategy.IsSynced()?
if done {
s.currentStrategy = s.defaultStrategy
}
Expand Down
235 changes: 235 additions & 0 deletions dot/sync/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package sync

import (
"slices"
"time"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
consensus_grandpa "github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
primitives "github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
"github.com/ChainSafe/gossamer/lib/grandpa"
"github.com/libp2p/go-libp2p/core/peer"
)

type WarpSyncPhase uint

const (
WarpProof = iota
TargetBlock
Completed
)

type WarpSyncStrategy struct {
// Strategy dependencies and config
peers *peerViewSet
badBlocks []string
reqMaker network.RequestMaker
warpSyncProvider grandpa.WarpSyncProofProvider

// Warp sync state
startedAt time.Time
phase WarpSyncPhase
syncedFragments int
setId consensus_grandpa.SetID
authorities primitives.AuthorityList
lastBlock types.Header
result types.BlockData
}

type WarpSyncConfig struct {
Telemetry Telemetry
BadBlocks []string
RequestMaker network.RequestMaker
}

// NewWarpSyncStrategy returns a new warp sync strategy
func NewWarpSyncStrategy(cfg *WarpSyncConfig) *WarpSyncStrategy {
return &WarpSyncStrategy{
badBlocks: cfg.BadBlocks,
reqMaker: cfg.RequestMaker,
peers: &peerViewSet{
view: make(map[peer.ID]peerView),
target: 0,
},
startedAt: time.Now(),
}
}

// OnBlockAnnounce on every new block announce received
// Synce it is a warp sync strategy, we are going to only update the peerset reputation
// And peers target block
func (w *WarpSyncStrategy) OnBlockAnnounce(from peer.ID, msg *network.BlockAnnounceMessage) (
repChange *Change, err error) {
blockAnnounceHeader := types.NewHeader(msg.ParentHash, msg.StateRoot, msg.ExtrinsicsRoot, msg.Number, msg.Digest)
blockAnnounceHeaderHash := blockAnnounceHeader.Hash()

logger.Infof("received block announce from %s: #%d (%s) best block: %v",
from,
blockAnnounceHeader.Number,
blockAnnounceHeaderHash,
msg.BestBlock,
)

if slices.Contains(w.badBlocks, blockAnnounceHeaderHash.String()) {
logger.Infof("bad block received from %s: #%d (%s) is a bad block",
from, blockAnnounceHeader.Number, blockAnnounceHeaderHash)

return &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.BadBlockAnnouncementValue,
Reason: peerset.BadBlockAnnouncementReason,
},
}, errBadBlockReceived
}

if msg.BestBlock {
w.peers.update(from, blockAnnounceHeaderHash, uint32(blockAnnounceHeader.Number)) //nolint:gosec
}

return &Change{
who: from,
rep: peerset.ReputationChange{
Value: peerset.GossipSuccessValue,
Reason: peerset.GossipSuccessReason,
},
}, nil
}

func (w *WarpSyncStrategy) OnBlockAnnounceHandshake(from peer.ID, msg *network.BlockAnnounceHandshake) error {
w.peers.update(from, msg.BestBlockHash, msg.BestBlockNumber)
return nil
}

// NextActions returns the next actions to be taken by the sync service
func (w *WarpSyncStrategy) NextActions() ([]*SyncTask, error) {
w.startedAt = time.Now()

var task SyncTask
switch w.phase {
case WarpProof:
task = SyncTask{
request: messages.NewWarpProofRequest(w.lastBlock.Hash()),
response: &messages.WarpSyncProof{},
requestMaker: w.reqMaker,
}
case TargetBlock:
req := messages.NewBlockRequest(
*messages.NewFromBlock(w.lastBlock.Hash()),
1,
messages.RequestedDataHeader+
messages.RequestedDataBody+
messages.RequestedDataJustification,
messages.Ascending,
)
task = SyncTask{
request: req,
response: &messages.BlockResponseMessage{},
requestMaker: w.reqMaker,
}
}

return []*SyncTask{&task}, nil
}

// Process processes the results of the sync tasks, getting the best warp sync response and
// Updating our block state
func (w *WarpSyncStrategy) Process(results []*SyncTaskResult) (
done bool, repChanges []Change, bans []peer.ID, err error) {
switch w.phase {
case WarpProof:
var warpProofResult *network.WarpSyncVerificationResult

repChanges, bans, warpProofResult = w.validateWarpSyncResults(results)
if !warpProofResult.Completed {
// Partial warp proof
w.setId = warpProofResult.SetId
w.authorities = warpProofResult.AuthorityList
w.lastBlock = warpProofResult.Header
} else {
w.phase = TargetBlock
w.lastBlock = warpProofResult.Header
}
case TargetBlock:
var validRes []RequestResponseData

repChanges, bans, validRes = validateResults(results, w.badBlocks)

// TODO: check if this can cause an issue
w.result = *validRes[0].responseData[0]
w.phase = Completed
}

return w.IsSynced(), repChanges, bans, nil
}

func (w *WarpSyncStrategy) validateWarpSyncResults(results []*SyncTaskResult) (
repChanges []Change, peersToBlock []peer.ID, result *network.WarpSyncVerificationResult) {
repChanges = make([]Change, 0)
peersToBlock = make([]peer.ID, 0)
bestProof := &messages.WarpSyncProof{}
bestResult := &network.WarpSyncVerificationResult{}

for _, result := range results {
switch response := result.response.(type) {
case *messages.WarpSyncProof:
if !result.completed {
continue
}

// If invalid warp sync proof, then we should block the peer and update its reputation
encodedProof, err := response.Encode()
if err != nil {
// This should never happen since the proof is already decoded without issues
panic("fail to encode warp proof")
}

// Best proof will be the finished proof or the proof with more fragments
res, err := w.warpSyncProvider.Verify(encodedProof, w.setId, w.authorities)

if err != nil {
repChanges = append(repChanges, Change{
who: result.who,
rep: peerset.ReputationChange{
Value: peerset.BadWarpProofValue,
Reason: peerset.BadWarpProofReason,
}})
peersToBlock = append(peersToBlock, result.who)
}

if response.IsFinished || len(response.Proofs) > len(bestProof.Proofs) {
bestProof = response
bestResult = res
}
default:
repChanges = append(repChanges, Change{
who: result.who,
rep: peerset.ReputationChange{
Value: peerset.UnexpectedResponseValue,
Reason: peerset.UnexpectedResponseReason,
}})
peersToBlock = append(peersToBlock, result.who)
continue
}
}

return repChanges, peersToBlock, bestResult
}

func (w *WarpSyncStrategy) ShowMetrics() {
totalSyncSeconds := time.Since(w.startedAt).Seconds()

fps := float64(w.syncedFragments) / totalSyncSeconds
logger.Infof("⛓️ synced %d warp sync fragments "+
"took: %.2f seconds, fps: %.2f fragments/second, target best block number #%d",
w.syncedFragments, totalSyncSeconds, fps, w.lastBlock.Number)
}

func (w *WarpSyncStrategy) IsSynced() bool {
return w.phase == Completed
}

var _ Strategy = (*WarpSyncStrategy)(nil)

0 comments on commit 4a532cf

Please sign in to comment.