Skip to content

Commit

Permalink
Merge pull request #4834 from onflow/yurii/4649-prev-epoch-refactorin…
Browse files Browse the repository at this point in the history
…g-attempt
  • Loading branch information
durkmurder authored Oct 24, 2023
2 parents de3c468 + c638874 commit e210519
Show file tree
Hide file tree
Showing 15 changed files with 508 additions and 385 deletions.
23 changes: 0 additions & 23 deletions consensus/integration/epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/flow-go/model/encodable"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/flow/mapfunc"
"github.com/onflow/flow-go/model/flow/order"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
Expand Down Expand Up @@ -220,7 +219,6 @@ func withNextEpoch(
encodableSnapshot := snapshot.Encodable()

currEpoch := &encodableSnapshot.Epochs.Current // take pointer so assignments apply
currentEpochIdentities, _ := snapshot.Identities(filter.Any)
nextEpochIdentities = nextEpochIdentities.Sort(order.Canonical[flow.Identity])

currEpoch.FinalView = currEpoch.FirstView + curEpochViews - 1 // first epoch lasts curEpochViews
Expand Down Expand Up @@ -249,27 +247,6 @@ func withNextEpoch(
// update protocol state
protocolState := encodableSnapshot.ProtocolState

// set identities for root snapshot to include next epoch identities,
// since we are in committed phase
protocolState.CurrentEpoch.ActiveIdentities = flow.DynamicIdentityEntryListFromIdentities(
append(
// all the current epoch identities
currentEpochIdentities,
// and all the NEW identities in next epoch, with 0 weight
nextEpochIdentities.
Filter(filter.Not(filter.In[flow.Identity](currentEpochIdentities))).
Map(mapfunc.WithWeight(0))...,
).Sort(order.Canonical[flow.Identity]))

nextEpochIdentities = append(
// all the next epoch identities
nextEpochIdentities,
// and all identities from current epoch, with 0 weight
currentEpochIdentities.
Filter(filter.Not(filter.In(nextEpochIdentities))).
Map(mapfunc.WithWeight(0))...,
).Sort(order.Canonical[flow.Identity])

// setup ID has changed, need to update it
convertedEpochSetup, _ := protocol.ToEpochSetup(inmem.NewEpoch(*currEpoch))
protocolState.CurrentEpoch.SetupID = convertedEpochSetup.ID()
Expand Down
2 changes: 0 additions & 2 deletions model/flow/filter/identity.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED

package filter

import (
Expand Down
4 changes: 2 additions & 2 deletions model/flow/mapfunc/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func WithInitialWeight(weight uint64) flow.IdentityMapFunc[flow.Identity] {
}

// WithWeight returns an anonymous function that assigns the given weight value
// to `Identity.Weight`. This function is primarily intended for testing, as
// Identity structs should be immutable by convention.
// to `Identity.Weight`. We pass the input identity by value, i.e. copy on write,
// to avoid modifying the original input.
func WithWeight(weight uint64) flow.IdentityMapFunc[flow.Identity] {
return func(identity flow.Identity) flow.Identity {
identity.Weight = weight
Expand Down
192 changes: 103 additions & 89 deletions model/flow/protocol_state.go

Large diffs are not rendered by default.

103 changes: 95 additions & 8 deletions model/flow/protocol_state_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package flow_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/order"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -29,12 +31,12 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
})
}
stateEntry := &flow.ProtocolStateEntry{
PreviousEpoch: nil,
CurrentEpoch: flow.EpochStateContainer{
SetupID: setup.ID(),
CommitID: currentEpochCommit.ID(),
ActiveIdentities: identities,
},
PreviousEpochEventIDs: flow.EventIDs{},
InvalidStateTransitionAttempted: false,
}
entry, err := flow.NewRichProtocolStateEntry(
Expand All @@ -47,7 +49,7 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
nil,
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(identities, setup.Participants, nil)
expectedIdentities, err := flow.BuildIdentityTable(setup.Participants, identities, nil, nil)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, entry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants")
})
Expand All @@ -69,9 +71,10 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.PreviousEpochSetup.Participants,
stateEntry.PreviousEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + previous epoch setup participants")
Expand Down Expand Up @@ -99,17 +102,19 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + next epoch setup participants")
assert.Nil(t, richEntry.NextEpochCommit)
expectedIdentities, err = flow.BuildIdentityTable(
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.NextEpochIdentityTable, "should be equal to next epoch setup participants + current epoch setup participants")
Expand All @@ -135,16 +140,18 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + next epoch setup participants")
expectedIdentities, err = flow.BuildIdentityTable(
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.NextEpochIdentityTable, "should be equal to next epoch setup participants + current epoch setup participants")
Expand All @@ -161,7 +168,7 @@ func TestProtocolStateEntry_Copy(t *testing.T) {
cpy := entry.Copy()
assert.Equal(t, entry, cpy)
assert.NotSame(t, entry.NextEpoch, cpy.NextEpoch)
assert.NotSame(t, entry.PreviousEpochEventIDs, cpy.PreviousEpochEventIDs)
assert.NotSame(t, entry.PreviousEpoch, cpy.PreviousEpoch)
assert.NotSame(t, entry.CurrentEpoch, cpy.CurrentEpoch)

cpy.InvalidStateTransitionAttempted = !entry.InvalidStateTransitionAttempted
Expand All @@ -180,3 +187,83 @@ func TestProtocolStateEntry_Copy(t *testing.T) {
})
assert.NotEqual(t, entry.CurrentEpoch.ActiveIdentities, cpy.CurrentEpoch.ActiveIdentities)
}

// TestBuildIdentityTable tests if BuildIdentityTable returns a correct identity, whenever we pass arguments with or without
// overlap. It also tests if the function returns an error when the arguments are not ordered in the same order.
func TestBuildIdentityTable(t *testing.T) {
t.Run("happy-path-no-identities-overlap", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])

identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.NoError(t, err)

expectedIdentities := targetEpochIdentities.Union(adjacentEpochIdentities.Map(func(identity flow.Identity) flow.Identity {
identity.Weight = 0
return identity
}))
assert.Equal(t, expectedIdentities, identityList)
})
t.Run("happy-path-identities-overlap", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentities := unittest.IdentityListFixture(10)
sampledIdentities, err := targetEpochIdentities.Sample(2)
// change address so we can assert that we take identities from target epoch and not adjacent epoch
for i, identity := range sampledIdentities.Copy() {
identity.Address = fmt.Sprintf("%d", i)
adjacentEpochIdentities = append(adjacentEpochIdentities, identity)
}
assert.NoError(t, err)

identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.NoError(t, err)

expectedIdentities := targetEpochIdentities.Union(adjacentEpochIdentities.Map(func(identity flow.Identity) flow.Identity {
identity.Weight = 0
return identity
}))
assert.Equal(t, expectedIdentities, identityList)
})
t.Run("target-epoch-identities-not-ordered", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
targetEpochIdentitySkeletons, err := targetEpochIdentities.ToSkeleton().Shuffle()
assert.NoError(t, err)
targetEpochDynamicIdentities := flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities)

adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
identityList, err := flow.BuildIdentityTable(
targetEpochIdentitySkeletons,
targetEpochDynamicIdentities,
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.Error(t, err)
assert.Empty(t, identityList)
})
t.Run("adjacent-epoch-identities-not-ordered", func(t *testing.T) {
adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentitySkeletons, err := adjacentEpochIdentities.ToSkeleton().Shuffle()
assert.NoError(t, err)
adjacentEpochDynamicIdentities := flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities)

targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentitySkeletons,
adjacentEpochDynamicIdentities,
)
assert.Error(t, err)
assert.Empty(t, identityList)
})
}
12 changes: 10 additions & 2 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED

package badger

import (
Expand Down Expand Up @@ -1121,6 +1119,11 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block, updater

err = updater.ProcessEpochSetup(ev)
if err != nil {
if protocol.IsInvalidServiceEventError(err) {
// we have observed an invalid service event, which triggers epoch fallback mode
updater.SetInvalidStateTransitionAttempted()
return dbUpdates, nil
}
return nil, irrecoverable.NewExceptionf("could not process epoch setup event: %w", err)
}

Expand Down Expand Up @@ -1149,6 +1152,11 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block, updater

err = updater.ProcessEpochCommit(ev)
if err != nil {
if protocol.IsInvalidServiceEventError(err) {
// we have observed an invalid service event, which triggers epoch fallback mode
updater.SetInvalidStateTransitionAttempted()
return dbUpdates, nil
}
return nil, irrecoverable.NewExceptionf("could not process epoch commit event: %w", err)
}

Expand Down
22 changes: 22 additions & 0 deletions state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,28 @@ func TestExtendEpochSetupInvalid(t *testing.T) {
assertEpochEmergencyFallbackTriggered(t, state, true)
})
})

t.Run("participants not ordered (EECC)", func(t *testing.T) {
util.RunWithFullProtocolState(t, rootSnapshot, func(db *badger.DB, state *protocol.ParticipantState) {
block1, createSetup := setupState(t, db, state)

_, receipt, seal := createSetup(func(setup *flow.EpochSetup) {
var err error
setup.Participants, err = setup.Participants.Shuffle()
require.NoError(t, err)
})

receiptBlock, sealingBlock := unittest.SealBlock(t, state, block1, receipt, seal)
err := state.Finalize(context.Background(), receiptBlock.ID())
require.NoError(t, err)
// epoch fallback not triggered before finalization
assertEpochEmergencyFallbackTriggered(t, state, false)
err = state.Finalize(context.Background(), sealingBlock.ID())
require.NoError(t, err)
// epoch fallback triggered after finalization
assertEpochEmergencyFallbackTriggered(t, state, true)
})
})
}

// TestExtendEpochCommitInvalid tests that incorporating an invalid EpochCommit
Expand Down
2 changes: 1 addition & 1 deletion state/protocol/inmem/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func SnapshotFromBootstrapStateWithParams(
})
}
protocolState := &flow.ProtocolStateEntry{
PreviousEpochEventIDs: flow.EventIDs{},
PreviousEpoch: nil,
CurrentEpoch: flow.EpochStateContainer{
SetupID: setup.ID(),
CommitID: commit.ID(),
Expand Down
6 changes: 3 additions & 3 deletions state/protocol/inmem/dynamic_protocol_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {
entry := unittest.ProtocolStateFixture()
adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand All @@ -48,7 +48,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {

adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand All @@ -63,7 +63,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {
entry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState())
adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand Down
6 changes: 4 additions & 2 deletions state/protocol/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ type StateUpdater interface {
// identities from previous+current epochs and start returning identities from current+next epochs.
// As a result of this operation protocol state for the next epoch will be created.
// CAUTION: Caller must validate input event.
// No errors are expected during normal operations.
// Expected errors during normal operations:
// - `protocol.InvalidServiceEventError` if the service event is not a valid state transition for the current protocol state
ProcessEpochSetup(epochSetup *flow.EpochSetup) error
// ProcessEpochCommit updates current protocol state with data from epoch commit event.
// Observing an epoch setup commit, transitions protocol state from setup to commit phase, at this point we have
// finished construction of the next epoch.
// As a result of this operation protocol state for next epoch will be committed.
// CAUTION: Caller must validate input event.
// No errors are expected during normal operations.
// Expected errors during normal operations:
// - `protocol.InvalidServiceEventError` if the service event is not a valid state transition for the current protocol state
ProcessEpochCommit(epochCommit *flow.EpochCommit) error
// UpdateIdentity updates identity table with new identity entry.
// Should pass identity which is already present in the table, otherwise an exception will be raised.
Expand Down
Loading

0 comments on commit e210519

Please sign in to comment.