Skip to content

Commit

Permalink
Merge branch 'yurii/4649-todos-and-refactoring-part-1' of https://git…
Browse files Browse the repository at this point in the history
…hub.com/onflow/flow-go into yurii/4649-prev-epoch-refactoring-attempt
  • Loading branch information
durkmurder committed Oct 19, 2023
2 parents dd04ca4 + 244af7b commit e765750
Show file tree
Hide file tree
Showing 19 changed files with 148 additions and 121 deletions.
4 changes: 2 additions & 2 deletions cmd/bootstrap/run/cluster_qc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func GenerateClusterRootQC(signers []bootstrap.NodeInfo, allCommitteeMembers flo
}

// STEP 1.5: patch committee to include dynamic identities. This is a temporary measure until bootstrapping is refactored.
// We need to do this since the committee is used to create the QC uses dynamic identities, but clustering for root block contain only
// static identities since there no state transitions haven't happened yet.
// We need a Committee for creating the cluster's root QC and the Committee requires dynamic identities to be instantiated.
// The clustering for root block contain only static identities, since there no state transitions have happened yet.
dynamicCommitteeMembers := make(flow.IdentityList, 0, len(allCommitteeMembers))
for _, participant := range allCommitteeMembers {
dynamicCommitteeMembers = append(dynamicCommitteeMembers, &flow.Identity{
Expand Down
95 changes: 52 additions & 43 deletions consensus/hotstuff/committees/cluster_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ import (
// implementation reference blocks on the cluster chain, which in turn reference
// blocks on the main chain - this implementation manages that translation.
type Cluster struct {
state protocol.State
payloads storage.ClusterPayloads
me flow.Identifier
// pre-computed leader selection for the full lifecycle of the cluster
selection *leader.LeaderSelection
// a filter that returns all members of the cluster committee allowed to vote
clusterMemberFilter flow.IdentityFilter[flow.Identity]
// initial set of cluster members, WITHOUT dynamic weight changes
initialClusterMembers flow.IdentitySkeletonList
state protocol.State
payloads storage.ClusterPayloads
me flow.Identifier
selection *leader.LeaderSelection // pre-computed leader selection for the full lifecycle of the cluster

clusterMembers flow.IdentitySkeletonList // cluster members in canonical order as specified by the epoch smart contract
clusterMemberFilter flow.IdentityFilter[flow.Identity] // filter that returns true for all members of the cluster committee allowed to vote
weightThresholdForQC uint64 // computed based on initial cluster committee weights
weightThresholdForTO uint64 // computed based on initial cluster committee weights

// initialClusterIdentities lists full Identities for cluster members (in canonical order) at time of cluster initialization by Epoch smart contract
initialClusterIdentities flow.IdentityList
weightThresholdForQC uint64 // computed based on initial cluster committee weights
weightThresholdForTO uint64 // computed based on initial cluster committee weights
}

var _ hotstuff.Replicas = (*Cluster)(nil)
Expand All @@ -44,7 +44,6 @@ func NewClusterCommittee(
epoch protocol.Epoch,
me flow.Identifier,
) (*Cluster, error) {

selection, err := leader.SelectionForCluster(cluster, epoch)
if err != nil {
return nil, fmt.Errorf("could not compute leader selection for cluster: %w", err)
Expand All @@ -53,18 +52,8 @@ func NewClusterCommittee(
initialClusterMembers := cluster.Members()
totalWeight := initialClusterMembers.TotalWeight()
initialClusterMembersSelector := initialClusterMembers.Selector()
// the next section is not very nice, but there are no dynamic identities for root block,
// and we need them to specificially handle querying of identities for root block
initialClusterIdentities := make(flow.IdentityList, 0, len(cluster.Members()))
for _, skeleton := range initialClusterMembers {
initialClusterIdentities = append(initialClusterIdentities, &flow.Identity{
IdentitySkeleton: *skeleton,
DynamicIdentity: flow.DynamicIdentity{
Weight: skeleton.InitialWeight,
Ejected: false,
},
})
}
initialClusterIdentities := constructInitialClusterIdentities(initialClusterMembers)

com := &Cluster{
state: state,
payloads: payloads,
Expand All @@ -76,7 +65,7 @@ func NewClusterCommittee(
filter.Not(filter.Ejected),
filter.HasWeight(true),
),
initialClusterMembers: initialClusterMembers,
clusterMembers: initialClusterMembers,
initialClusterIdentities: initialClusterIdentities,
weightThresholdForQC: WeightThresholdToBuildQC(totalWeight),
weightThresholdForTO: WeightThresholdToTimeout(totalWeight),
Expand All @@ -90,17 +79,14 @@ func (c *Cluster) IdentitiesByBlock(blockID flow.Identifier) (flow.IdentityList,
// blockID is a collection block not a block produced by consensus,
// to query the identities from protocol state, we need to use the reference block id from the payload
//
// first retrieve the cluster block payload
// first retrieve the cluster block's payload
payload, err := c.payloads.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not get cluster payload: %w", err)
}

// an empty reference block ID indicates a root block
isRootBlock := payload.ReferenceBlockID == flow.ZeroID

// use the initial cluster members for root block
if isRootBlock {
// An empty reference block ID indicates a root block. In this case, use the initial cluster members for root block
if isRootBlock := payload.ReferenceBlockID == flow.ZeroID; isRootBlock {
return c.initialClusterIdentities, nil
}

Expand All @@ -110,18 +96,14 @@ func (c *Cluster) IdentitiesByBlock(blockID flow.Identifier) (flow.IdentityList,
}

func (c *Cluster) IdentityByBlock(blockID flow.Identifier, nodeID flow.Identifier) (*flow.Identity, error) {

// first retrieve the cluster block payload
// first retrieve the cluster block's payload
payload, err := c.payloads.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not get cluster payload: %w", err)
}

// an empty reference block ID indicates a root block
isRootBlock := payload.ReferenceBlockID == flow.ZeroID

// use the initial cluster members for root block
if isRootBlock {
// An empty reference block ID indicates a root block. In this case, use the initial cluster members for root block
if isRootBlock := payload.ReferenceBlockID == flow.ZeroID; isRootBlock {
identity, ok := c.initialClusterIdentities.ByNodeID(nodeID)
if !ok {
return nil, model.NewInvalidSignerErrorf("node %v is not an authorized hotstuff participant", nodeID)
Expand All @@ -143,11 +125,12 @@ func (c *Cluster) IdentityByBlock(blockID flow.Identifier, nodeID flow.Identifie
return identity, nil
}

// IdentitiesByEpoch returns the initial cluster members for this epoch. The view
// parameter is the view in the cluster consensus. Since clusters only exist for
// one epoch, we don't need to check the view.
// IdentitiesByEpoch returns the IdentitySkeletons of the cluster members in canonical order.
// This represents the cluster composition at the time the cluster was specified by the epoch smart
// contract (hence, we return IdentitySkeletons as opposed to full identities). Since clusters only
// exist for one epoch, we don't need to check the view.
func (c *Cluster) IdentitiesByEpoch(_ uint64) (flow.IdentitySkeletonList, error) {
return c.initialClusterMembers, nil
return c.clusterMembers, nil
}

// IdentityByEpoch returns the node from the initial cluster members for this epoch.
Expand All @@ -158,7 +141,7 @@ func (c *Cluster) IdentitiesByEpoch(_ uint64) (flow.IdentitySkeletonList, error)
// - model.InvalidSignerError if nodeID was not listed by the Epoch Setup event as an
// authorized participant in this cluster
func (c *Cluster) IdentityByEpoch(view uint64, participantID flow.Identifier) (*flow.IdentitySkeleton, error) {
identity, ok := c.initialClusterMembers.ByNodeID(participantID)
identity, ok := c.clusterMembers.ByNodeID(participantID)
if !ok {
return nil, model.NewInvalidSignerErrorf("node %v is not an authorized hotstuff participant", participantID)
}
Expand Down Expand Up @@ -196,3 +179,29 @@ func (c *Cluster) Self() flow.Identifier {
func (c *Cluster) DKG(_ uint64) (hotstuff.DKG, error) {
panic("queried DKG of cluster committee")
}

// constructInitialClusterIdentities extends the IdentitySkeletons of the cluster members to their full Identities
// (in canonical order). at time of cluster initialization by Epoch smart contract. This represents the cluster
// composition at the time the cluster was specified by the epoch smart contract.
//
// CONTEXT: The EpochSetup event contains the IdentitySkeletons for each cluster, thereby specifying cluster membership.
// While ejection status and dynamic weight are not part of the EpochSetup event, we can supplement this information as follows:
// - Per convention, service events are delivered (asynchronously) in an *order-preserving* manner. Furthermore, weight changes or
// node ejection is also mediated by system smart contracts and delivered via service events.
// - Therefore, the EpochSetup event contains the up-to-date snapshot of the cluster members. Any weight changes or node ejection
// that happened before should be reflected in the EpochSetup event. Specifically, the initial weight should be reduced and ejected
// nodes should be no longer listed in the EpochSetup event. Hence, when the EpochSetup event is emitted / processed, the weight of
// all cluster members equals their InitialWeight and the Ejected flag is false.
func constructInitialClusterIdentities(clusterMembers flow.IdentitySkeletonList) flow.IdentityList {
initialClusterIdentities := make(flow.IdentityList, 0, len(clusterMembers))
for _, skeleton := range clusterMembers {
initialClusterIdentities = append(initialClusterIdentities, &flow.Identity{
IdentitySkeleton: *skeleton,
DynamicIdentity: flow.DynamicIdentity{
Weight: skeleton.InitialWeight,
Ejected: false,
},
})
}
return initialClusterIdentities
}
2 changes: 1 addition & 1 deletion consensus/hotstuff/committees/consensus_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newStaticEpochInfo(epoch protocol.Epoch) (*staticEpochInfo, error) {
if err != nil {
return nil, fmt.Errorf("could not initial identities: %w", err)
}
initialCommittee := initialIdentities.Filter(filter.IsAllowedConsensusCommitteeMember).ToSkeleton()
initialCommittee := initialIdentities.Filter(filter.IsConsensusCommitteeMember).ToSkeleton()
dkg, err := epoch.DKG()
if err != nil {
return nil, fmt.Errorf("could not get dkg: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/committees/leader/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func SelectionForConsensus(epoch protocol.Epoch) (*LeaderSelection, error) {
firstView,
rng,
int(finalView-firstView+1), // add 1 because both first/final view are inclusive
identities.Filter(filter.IsAllowedConsensusCommitteeMember),
identities.Filter(filter.IsConsensusCommitteeMember),
)
return leaders, err
}
1 change: 0 additions & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ func createNode(
rootQC, err := rootSnapshot.QuorumCertificate()
require.NoError(t, err)

// selector := filter.HasRole[flow.Identity](flow.RoleConsensus)
committee, err := committees.NewConsensusCommittee(state, localID)
require.NoError(t, err)
protocolStateEvents.AddConsumer(committee)
Expand Down
13 changes: 5 additions & 8 deletions engine/collection/test/cluster_switchover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf)
t: t,
conf: conf,
}

identityRoles := unittest.CompleteIdentitySet(unittest.IdentityListFixture(int(conf.collectors), unittest.WithRole(flow.RoleCollection))...)
identities := flow.IdentityList{}
for _, missingRole := range identityRoles {
nodeInfo := unittest.PrivateNodeInfosFixture(1, unittest.WithRole(missingRole.Role))[0]
tc.nodeInfos = append(tc.nodeInfos, nodeInfo)
identities = append(identities, nodeInfo.Identity())
}
tc.nodeInfos = unittest.PrivateNodeInfosFromIdentityList(
unittest.CompleteIdentitySet(
unittest.IdentityListFixture(int(conf.collectors), unittest.WithRole(flow.RoleCollection))...),
)
identities := model.ToIdentityList(tc.nodeInfos)
collectors := identities.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)).ToSkeleton()
assignment := unittest.ClusterAssignment(tc.conf.clusters, collectors)
clusters, err := factory.NewClusterList(assignment, collectors)
Expand Down
2 changes: 1 addition & 1 deletion engine/consensus/dkg/reactor_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (e *ReactorEngine) startDKGForEpoch(currentEpochCounter uint64, first *flow
log.Fatal().Err(err).Msg("could not retrieve epoch info")
}

committee := curDKGInfo.identities.Filter(filter.IsAllowedConsensusCommitteeMember)
committee := curDKGInfo.identities.Filter(filter.IsConsensusCommitteeMember)

log.Info().
Uint64("phase1", curDKGInfo.phase1FinalView).
Expand Down
60 changes: 24 additions & 36 deletions engine/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,22 @@ func TestExecutionFlow(t *testing.T) {

chainID := flow.Testnet

colID := unittest.PrivateNodeInfosFixture(
1,
colID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleCollection),
unittest.WithKeys,
)[0]
conID := unittest.PrivateNodeInfosFixture(
1,
)
conID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleConsensus),
unittest.WithKeys,
)[0]
exeID := unittest.PrivateNodeInfosFixture(
1,
)
exeID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleExecution),
unittest.WithKeys,
)[0]
verID := unittest.PrivateNodeInfosFixture(
1,
)
verID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleVerification),
unittest.WithKeys,
)[0]
)

identities := unittest.CompleteIdentitySet(colID.Identity(), conID.Identity(), exeID.Identity(), verID.Identity()).
Sort(order.Canonical[flow.Identity])
Expand Down Expand Up @@ -357,21 +353,18 @@ func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {

chainID := flow.Emulator

colNodeInfo := unittest.PrivateNodeInfosFixture(
1,
colNodeInfo := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleCollection),
unittest.WithKeys,
)[0]
conNodeInfo := unittest.PrivateNodeInfosFixture(
1,
)
conNodeInfo := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleConsensus),
unittest.WithKeys,
)[0]
exe1NodeInfo := unittest.PrivateNodeInfosFixture(
1,
)
exe1NodeInfo := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleExecution),
unittest.WithKeys,
)[0]
)

colID := colNodeInfo.Identity()
conID := conNodeInfo.Identity()
Expand Down Expand Up @@ -516,31 +509,26 @@ func TestBroadcastToMultipleVerificationNodes(t *testing.T) {

chainID := flow.Emulator

colID := unittest.PrivateNodeInfosFixture(
1,
colID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleCollection),
unittest.WithKeys,
)[0]
conID := unittest.PrivateNodeInfosFixture(
1,
)
conID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleConsensus),
unittest.WithKeys,
)[0]
exeID := unittest.PrivateNodeInfosFixture(
1,
)
exeID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleExecution),
unittest.WithKeys,
)[0]
ver1ID := unittest.PrivateNodeInfosFixture(
1,
)
ver1ID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleVerification),
unittest.WithKeys,
)[0]
ver2ID := unittest.PrivateNodeInfosFixture(
1,
)
ver2ID := unittest.PrivateNodeInfoFixture(
unittest.WithRole(flow.RoleVerification),
unittest.WithKeys,
)[0]
)

identities := unittest.CompleteIdentitySet(colID.Identity(),
conID.Identity(),
Expand Down
7 changes: 3 additions & 4 deletions engine/verification/utils/unittest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,13 @@ func bootstrapSystem(
bootstrapNodesInfo := make([]bootstrap.NodeInfo, 0)
var verID bootstrap.NodeInfo
for _, missingRole := range unittest.CompleteIdentitySet() {
nodeInfo := unittest.PrivateNodeInfosFixture(1, unittest.WithRole(missingRole.Role))[0]
nodeInfo := unittest.PrivateNodeInfoFixture(unittest.WithRole(missingRole.Role))
if nodeInfo.Role == flow.RoleVerification {
verID = nodeInfo
}
bootstrapNodesInfo = append(bootstrapNodesInfo, nodeInfo)
}
bootstrapNodesInfo = append(bootstrapNodesInfo, unittest.PrivateNodeInfosFixture(1, unittest.WithRole(flow.RoleExecution))...) // adds extra execution node

bootstrapNodesInfo = append(bootstrapNodesInfo, unittest.PrivateNodeInfoFixture(unittest.WithRole(flow.RoleExecution))) // adds extra execution node
identities := bootstrap.ToIdentityList(bootstrapNodesInfo)

collector := &metrics.NoopCollector{}
Expand All @@ -645,7 +644,7 @@ func bootstrapSystem(

if !authorized {
// creates a new verification node identity that is unauthorized for this epoch
verID = unittest.PrivateNodeInfosFixture(1, unittest.WithRole(flow.RoleVerification))[0]
verID = unittest.PrivateNodeInfoFixture(unittest.WithRole(flow.RoleVerification))
bootstrapNodesInfo = append(bootstrapNodesInfo, verID)
identities = append(identities, verID.Identity())

Expand Down
2 changes: 1 addition & 1 deletion insecure/corruptnet/network_egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// The attacker is mocked out in this test.
func TestHandleOutgoingEvent_AttackerRegistered(t *testing.T) {
codec := unittest.NetworkCodec()
corruptedIdentity := unittest.PrivateNodeInfosFixture(1, unittest.WithAddress(insecure.DefaultAddress))[0]
corruptedIdentity := unittest.PrivateNodeInfoFixture(unittest.WithAddress(insecure.DefaultAddress))
flowNetwork := mocknetwork.NewNetwork(t)
ccf := mockinsecure.NewCorruptConduitFactory(t)
ccf.On("RegisterEgressController", mock.Anything).Return(nil)
Expand Down
2 changes: 1 addition & 1 deletion insecure/corruptnet/network_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func corruptNetworkFixture(t *testing.T, logger zerolog.Logger, corruptedID ...f
// create corruptible network with no attacker registered
codec := unittest.NetworkCodec()

corruptedIdentity := unittest.PrivateNodeInfosFixture(1, unittest.WithAddress(insecure.DefaultAddress))[0]
corruptedIdentity := unittest.PrivateNodeInfoFixture(unittest.WithAddress(insecure.DefaultAddress))
// some tests will want to create corruptible network with a specific ID
if len(corruptedID) > 0 {
corruptedIdentity.NodeID = corruptedID[0]
Expand Down
2 changes: 1 addition & 1 deletion insecure/integration/tests/composability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestCorruptNetworkFrameworkHappyPath(t *testing.T) {
// withCorruptNetwork creates a real corrupt network, starts it, runs the "run" function, and then stops it.
func withCorruptNetwork(t *testing.T, run func(*testing.T, flow.Identity, *corruptnet.Network, *stub.Hub)) {
codec := unittest.NetworkCodec()
corruptedIdentity := unittest.PrivateNodeInfosFixture(1, unittest.WithAddress(insecure.DefaultAddress))[0]
corruptedIdentity := unittest.PrivateNodeInfoFixture(unittest.WithAddress(insecure.DefaultAddress))

// life-cycle management of orchestratorNetwork.
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading

0 comments on commit e765750

Please sign in to comment.