Skip to content

Commit

Permalink
replace use of rootcmd.execute and move logic to run package
Browse files Browse the repository at this point in the history
  • Loading branch information
kc1116 committed Jul 10, 2024
1 parent 78fb541 commit 3b6d707
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 107 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap/cmd/rootblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func rootBlock(cmd *cobra.Command, args []string) {
log.Info().Msg("")

log.Info().Msg("constructing root QCs for collection node clusters")
clusterQCs := common.ConstructRootQCsForClusters(log, clusters, internalNodes, clusterBlocks)
clusterQCs := run.ConstructRootQCsForClusters(log, clusters, internalNodes, clusterBlocks)
log.Info().Msg("")

log.Info().Msg("constructing root header")
Expand Down
200 changes: 200 additions & 0 deletions cmd/bootstrap/run/epochs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package run

import (
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/cadence"

Check failure on line 8 in cmd/bootstrap/run/epochs.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not `goimports`-ed with -local github.com/onflow/flow-go (goimports)
"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/model/bootstrap"
model "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol/inmem"
)

// GenerateRecoverEpochTxArgs generates the required transaction arguments for the `recoverEpoch` transaction.
func GenerateRecoverEpochTxArgs(log zerolog.Logger,
internalNodePrivInfoDir string,
nodeConfigJson string,
collectionClusters int,
epochCounter uint64,
rootChainID flow.ChainID,
numViewsInStakingAuction uint64,
numViewsInEpoch uint64,
targetDuration uint64,
targetEndTime uint64,
initNewEpoch bool,
snapshot *inmem.Snapshot) ([]cadence.Value, error) {
epoch := snapshot.Epochs().Current()

currentEpochIdentities, err := snapshot.Identities(filter.IsValidProtocolParticipant)
if err != nil {
return nil, fmt.Errorf("failed to get valid protocol participants from snapshot: %w", err)
}

// separate collector nodes by internal and partner nodes
collectors := currentEpochIdentities.Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
internalCollectors := make(flow.IdentityList, 0)
partnerCollectors := make(flow.IdentityList, 0)

log.Info().Msg("collecting internal node network and staking keys")
internalNodes, err := common.ReadFullInternalNodeInfos(log, internalNodePrivInfoDir, nodeConfigJson)
if err != nil {
return nil, fmt.Errorf("failed to read full internal node infos: %w", err)
}

internalNodesMap := make(map[flow.Identifier]struct{})
for _, node := range internalNodes {
if !currentEpochIdentities.Exists(node.Identity()) {
return nil, fmt.Errorf("node ID found in internal node infos missing from protocol snapshot identities %s: %w", node.NodeID, err)
}
internalNodesMap[node.NodeID] = struct{}{}
}
log.Info().Msg("")

for _, collector := range collectors {
if _, ok := internalNodesMap[collector.NodeID]; ok {
internalCollectors = append(internalCollectors, collector)
} else {
partnerCollectors = append(partnerCollectors, collector)
}
}

currentEpochDKG, err := epoch.DKG()
if err != nil {
return nil, fmt.Errorf("failed to get DKG for current epoch: %w", err)
}

log.Info().Msg("computing collection node clusters")

assignments, clusters, err := common.ConstructClusterAssignment(log, partnerCollectors, internalCollectors, collectionClusters)
if err != nil {
log.Fatal().Err(err).Msg("unable to generate cluster assignment")
}
log.Info().Msg("")

log.Info().Msg("constructing root blocks for collection node clusters")
clusterBlocks := GenerateRootClusterBlocks(epochCounter, clusters)
log.Info().Msg("")

log.Info().Msg("constructing root QCs for collection node clusters")
clusterQCs := ConstructRootQCsForClusters(log, clusters, internalNodes, clusterBlocks)
log.Info().Msg("")

dkgPubKeys := make([]cadence.Value, 0)
nodeIds := make([]cadence.Value, 0)

// NOTE: The RecoveryEpoch will re-use the last successful DKG output. This means that the consensus
// committee in the RecoveryEpoch must be identical to the committee which participated in that DKG.
dkgGroupKeyCdc, cdcErr := cadence.NewString(currentEpochDKG.GroupKey().String())
if cdcErr != nil {
log.Fatal().Err(cdcErr).Msg("failed to get dkg group key cadence string")
}
dkgPubKeys = append(dkgPubKeys, dkgGroupKeyCdc)
for _, id := range currentEpochIdentities {
if id.GetRole() == flow.RoleConsensus {
dkgPubKey, keyShareErr := currentEpochDKG.KeyShare(id.GetNodeID())
if keyShareErr != nil {
log.Fatal().Err(keyShareErr).Msg(fmt.Sprintf("failed to get dkg pub key share for node: %s", id.GetNodeID()))
}
dkgPubKeyCdc, cdcErr := cadence.NewString(dkgPubKey.String())
if cdcErr != nil {
log.Fatal().Err(cdcErr).Msg(fmt.Sprintf("failed to get dkg pub key cadence string for node: %s", id.GetNodeID()))
}
dkgPubKeys = append(dkgPubKeys, dkgPubKeyCdc)
}
nodeIdCdc, err := cadence.NewString(id.GetNodeID().String())
if err != nil {
log.Fatal().Err(err).Msg(fmt.Sprintf("failed to convert node ID to cadence string: %s", id.GetNodeID()))
}
nodeIds = append(nodeIds, nodeIdCdc)
}

clusterQCAddress := systemcontracts.SystemContractsForChain(flow.ChainID(rootChainID)).ClusterQC.Address.String()
qcVoteData, err := common.ConvertClusterQcsCdc(clusterQCs, clusters, clusterQCAddress)
if err != nil {
log.Fatal().Err(err).Msg("failed to convert cluster qcs to cadence type")
}

currEpochFinalView, err := epoch.FinalView()
if err != nil {
log.Fatal().Err(err).Msg("failed to get final view of current epoch")
}

args := []cadence.Value{
// epoch start view
cadence.NewUInt64(currEpochFinalView + 1),
// staking phase end view
cadence.NewUInt64(currEpochFinalView + numViewsInStakingAuction),
// epoch end view
cadence.NewUInt64(currEpochFinalView + numViewsInEpoch),
// target duration
cadence.NewUInt64(targetDuration),
// target end time
cadence.NewUInt64(targetEndTime),
// clusters,
common.ConvertClusterAssignmentsCdc(assignments),
// qcVoteData
cadence.NewArray(qcVoteData),
// dkg pub keys
cadence.NewArray(dkgPubKeys),
// node ids
cadence.NewArray(nodeIds),
// recover the network by initializing a new recover epoch which will increment the smart contract epoch counter
// or overwrite the epoch metadata for the current epoch
cadence.NewBool(initNewEpoch),
}

return args, nil
}

// ConstructRootQCsForClusters constructs a root QC for each cluster in the list.
// Args:
// - log: the logger instance.
// - clusterList: list of clusters
// - nodeInfos: list of NodeInfos (must contain all internal nodes)
// - clusterBlocks: list of root blocks for each cluster
// Returns:
// - flow.AssignmentList: the generated assignment list.
// - flow.ClusterList: the generate collection cluster list.
func ConstructRootQCsForClusters(log zerolog.Logger, clusterList flow.ClusterList, nodeInfos []bootstrap.NodeInfo, clusterBlocks []*cluster.Block) []*flow.QuorumCertificate {

if len(clusterBlocks) != len(clusterList) {
log.Fatal().Int("len(clusterBlocks)", len(clusterBlocks)).Int("len(clusterList)", len(clusterList)).
Msg("number of clusters needs to equal number of cluster blocks")
}

qcs := make([]*flow.QuorumCertificate, len(clusterBlocks))
for i, cluster := range clusterList {
signers := filterClusterSigners(cluster, nodeInfos)

qc, err := GenerateClusterRootQC(signers, cluster, clusterBlocks[i])
if err != nil {
log.Fatal().Err(err).Int("cluster index", i).Msg("generating collector cluster root QC failed")
}
qcs[i] = qc
}

return qcs
}

// Filters a list of nodes to include only nodes that will sign the QC for the
// given cluster. The resulting list of nodes is only nodes that are in the
// given cluster AND are not partner nodes (ie. we have the private keys).
func filterClusterSigners(cluster flow.IdentitySkeletonList, nodeInfos []model.NodeInfo) []model.NodeInfo {
var filtered []model.NodeInfo
for _, node := range nodeInfos {
_, isInCluster := cluster.ByNodeID(node.NodeID)
isNotPartner := node.Type() == model.NodeInfoTypePrivate

if isInCluster && isNotPartner {
filtered = append(filtered, node)
}
}

return filtered
}
52 changes: 0 additions & 52 deletions cmd/util/cmd/common/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/onflow/cadence"
cdcCommon "github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/bootstrap/run"
"github.com/onflow/flow-go/model/bootstrap"
model "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/assignment"
"github.com/onflow/flow-go/model/flow/factory"
Expand Down Expand Up @@ -115,36 +111,6 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes
return assignments, clusters, nil
}

// ConstructRootQCsForClusters constructs a root QC for each cluster in the list.
// Args:
// - log: the logger instance.
// - clusterList: list of clusters
// - nodeInfos: list of NodeInfos (must contain all internal nodes)
// - clusterBlocks: list of root blocks for each cluster
// Returns:
// - flow.AssignmentList: the generated assignment list.
// - flow.ClusterList: the generate collection cluster list.
func ConstructRootQCsForClusters(log zerolog.Logger, clusterList flow.ClusterList, nodeInfos []bootstrap.NodeInfo, clusterBlocks []*cluster.Block) []*flow.QuorumCertificate {

if len(clusterBlocks) != len(clusterList) {
log.Fatal().Int("len(clusterBlocks)", len(clusterBlocks)).Int("len(clusterList)", len(clusterList)).
Msg("number of clusters needs to equal number of cluster blocks")
}

qcs := make([]*flow.QuorumCertificate, len(clusterBlocks))
for i, cluster := range clusterList {
signers := filterClusterSigners(cluster, nodeInfos)

qc, err := run.GenerateClusterRootQC(signers, cluster, clusterBlocks[i])
if err != nil {
log.Fatal().Err(err).Int("cluster index", i).Msg("generating collector cluster root QC failed")
}
qcs[i] = qc
}

return qcs
}

// ConvertClusterAssignmentsCdc converts golang cluster assignments type to Cadence type `[[String]]`.
func ConvertClusterAssignmentsCdc(assignments flow.AssignmentList) cadence.Array {
stringArrayType := cadence.NewVariableSizedArrayType(cadence.StringType)
Expand Down Expand Up @@ -215,21 +181,3 @@ func newFlowClusterQCVoteDataStructType(clusterQcAddress string) *cadence.Struct
},
}
}

// Filters a list of nodes to include only nodes that will sign the QC for the
// given cluster. The resulting list of nodes is only nodes that are in the
// given cluster AND are not partner nodes (ie. we have the private keys).
func filterClusterSigners(cluster flow.IdentitySkeletonList, nodeInfos []model.NodeInfo) []model.NodeInfo {

var filtered []model.NodeInfo
for _, node := range nodeInfos {
_, isInCluster := cluster.ByNodeID(node.NodeID)
isNotPartner := node.Type() == model.NodeInfoTypePrivate

if isInCluster && isNotPartner {
filtered = append(filtered, node)
}
}

return filtered
}
23 changes: 19 additions & 4 deletions cmd/util/cmd/epochs/cmd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,24 @@ func getSnapshot() *inmem.Snapshot {
// generateRecoverEpochTxArgs generates recover epoch transaction arguments from a root protocol state snapshot and writes it to a JSON file
func generateRecoverEpochTxArgs(getSnapshot func() *inmem.Snapshot) func(cmd *cobra.Command, args []string) {
return func(cmd *cobra.Command, args []string) {
// extract arguments from recover epoch tx from snapshot
txArgs := extractRecoverEpochArgs(getSnapshot())

// generate transaction arguments
txArgs, err := run.GenerateRecoverEpochTxArgs(
log,
flagInternalNodePrivInfoDir,
flagNodeConfigJson,
flagCollectionClusters,
flagEpochCounter,
flow.ChainID(flagRootChainID),
flagNumViewsInStakingAuction,
flagNumViewsInEpoch,
flagTargetDuration,
flagTargetEndTime,
flagInitNewEpoch,
getSnapshot(),
)
if err != nil {
log.Fatal().Err(err).Msg("failed to generate recover epoch transaction arguments")
}
// encode to JSON
encodedTxArgs, err := epochcmdutil.EncodeArgs(txArgs)
if err != nil {
Expand Down Expand Up @@ -229,7 +244,7 @@ func extractRecoverEpochArgs(snapshot *inmem.Snapshot) []cadence.Value {
log.Info().Msg("")

log.Info().Msg("constructing root QCs for collection node clusters")
clusterQCs := common.ConstructRootQCsForClusters(log, clusters, internalNodes, clusterBlocks)
clusterQCs := run.ConstructRootQCsForClusters(log, clusters, internalNodes, clusterBlocks)
log.Info().Msg("")

dkgPubKeys := make([]cadence.Value, 0)
Expand Down
20 changes: 10 additions & 10 deletions integration/tests/epochs/base_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BaseSuite struct {
suite.Suite
lib.TestnetStateTracker
cancel context.CancelFunc
log zerolog.Logger
Log zerolog.Logger
Net *testnet.FlowNetwork
ghostID flow.Identifier

Expand All @@ -42,7 +42,7 @@ type BaseSuite struct {
DKGPhaseLen uint64
EpochLen uint64
EpochCommitSafetyThreshold uint64
NumOfCollectionClusters uint64
NumOfCollectionClusters int
// Whether approvals are required for sealing (we only enable for VN tests because
// requiring approvals requires a longer DKG period to avoid flakiness)
RequiredSealApprovals uint // defaults to 0 (no approvals required)
Expand All @@ -61,10 +61,10 @@ func (s *BaseSuite) SetupTest() {
require.Greater(s.T(), s.EpochLen, minEpochLength+s.EpochCommitSafetyThreshold, "epoch too short")

s.Ctx, s.cancel = context.WithCancel(context.Background())
s.log = unittest.LoggerForTest(s.Suite.T(), zerolog.InfoLevel)
s.log.Info().Msg("================> SetupTest")
s.Log = unittest.LoggerForTest(s.Suite.T(), zerolog.InfoLevel)
s.Log.Info().Msg("================> SetupTest")
defer func() {
s.log.Info().Msg("================> Finish SetupTest")
s.Log.Info().Msg("================> Finish SetupTest")
}()

collectionConfigs := []func(*testnet.NodeConfig){
Expand Down Expand Up @@ -115,15 +115,15 @@ func (s *BaseSuite) SetupTest() {

s.Client = client

// log network info periodically to aid in debugging future flaky tests
go lib.LogStatusPeriodically(s.T(), s.Ctx, s.log, s.Client, 5*time.Second)
// Log network info periodically to aid in debugging future flaky tests
go lib.LogStatusPeriodically(s.T(), s.Ctx, s.Log, s.Client, 5*time.Second)
}

func (s *BaseSuite) TearDownTest() {
s.log.Info().Msg("================> Start TearDownTest")
s.Log.Info().Msg("================> Start TearDownTest")
s.Net.Remove()
s.cancel()
s.log.Info().Msg("================> Finish TearDownTest")
s.Log.Info().Msg("================> Finish TearDownTest")
}

func (s *BaseSuite) Ghost() *client.GhostClient {
Expand All @@ -135,7 +135,7 @@ func (s *BaseSuite) Ghost() *client.GhostClient {
// TimedLogf logs the message using t.Log and the suite logger, but prefixes the current time.
// This enables viewing logs inline with Docker logs as well as other test logs.
func (s *BaseSuite) TimedLogf(msg string, args ...interface{}) {
s.log.Info().Msgf(msg, args...)
s.Log.Info().Msgf(msg, args...)
args = append([]interface{}{time.Now().String()}, args...)
s.T().Logf("%s - "+msg, args...)
}
Expand Down
Loading

0 comments on commit 3b6d707

Please sign in to comment.