Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dynamic Protocol State] EpochStateContainer stores epoch active identities #4834

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ff90836
WIP. Updated prev epoch to contain active identities. Updated ActiveI…
durkmurder Oct 18, 2023
d8046b5
Updated storage layer assertions and process of reconstructing identi…
durkmurder Oct 18, 2023
641dbca
Changed how state updater processes epoch setup event. Updated tests
durkmurder Oct 18, 2023
dd04ca4
Changed how identities are applied to protocol state. Fixed some tests
durkmurder Oct 18, 2023
e765750
Merge branch 'yurii/4649-todos-and-refactoring-part-1' of https://git…
durkmurder Oct 19, 2023
09fdfbe
model/flow/protocol_state.go:
Oct 20, 2023
d9ae36d
minor comment revision
Oct 20, 2023
15e3e33
Fixed tests
durkmurder Oct 20, 2023
0dd823a
Merge pull request #4854 from onflow/alex/4649-prev-epoch-refactoring…
durkmurder Oct 20, 2023
d3aa8d4
Merge branch 'feature/dynamic-protocol-state' into yurii/4649-prev-ep…
durkmurder Oct 23, 2023
95dbf1e
Added test for BuildIdentityTable
durkmurder Oct 23, 2023
df2bfab
Changed order of arguments in BuildIdentityTable
durkmurder Oct 23, 2023
25624f2
Linted
durkmurder Oct 23, 2023
f92636d
Apply suggestions from code review
durkmurder Oct 23, 2023
0172dd1
Changed expected error types of ProcessEpochSetup and ProcessEpochCom…
durkmurder Oct 23, 2023
d51fcfa
Merge branch 'yurii/4649-prev-epoch-refactoring-attempt' of https://g…
durkmurder Oct 23, 2023
f6d519b
updated goDoc of method `BuildIdentityTable` to completely describe a…
Oct 23, 2023
49934ce
updated code-internal documentation to precisely explain the identity…
Oct 23, 2023
c638874
minor re-ordering of the logic and in-code documentation of function …
Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions consensus/integration/epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/flow-go/model/encodable"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/flow/mapfunc"
"github.com/onflow/flow-go/model/flow/order"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
Expand Down Expand Up @@ -220,7 +219,6 @@ func withNextEpoch(
encodableSnapshot := snapshot.Encodable()

currEpoch := &encodableSnapshot.Epochs.Current // take pointer so assignments apply
currentEpochIdentities, _ := snapshot.Identities(filter.Any)
nextEpochIdentities = nextEpochIdentities.Sort(order.Canonical[flow.Identity])

currEpoch.FinalView = currEpoch.FirstView + curEpochViews - 1 // first epoch lasts curEpochViews
Expand Down Expand Up @@ -249,27 +247,6 @@ func withNextEpoch(
// update protocol state
protocolState := encodableSnapshot.ProtocolState

// set identities for root snapshot to include next epoch identities,
// since we are in committed phase
protocolState.CurrentEpoch.ActiveIdentities = flow.DynamicIdentityEntryListFromIdentities(
append(
// all the current epoch identities
currentEpochIdentities,
// and all the NEW identities in next epoch, with 0 weight
nextEpochIdentities.
Filter(filter.Not(filter.In[flow.Identity](currentEpochIdentities))).
Map(mapfunc.WithWeight(0))...,
).Sort(order.Canonical[flow.Identity]))

nextEpochIdentities = append(
// all the next epoch identities
nextEpochIdentities,
// and all identities from current epoch, with 0 weight
currentEpochIdentities.
Filter(filter.Not(filter.In(nextEpochIdentities))).
Map(mapfunc.WithWeight(0))...,
).Sort(order.Canonical[flow.Identity])

// setup ID has changed, need to update it
convertedEpochSetup, _ := protocol.ToEpochSetup(inmem.NewEpoch(*currEpoch))
protocolState.CurrentEpoch.SetupID = convertedEpochSetup.ID()
Expand Down
2 changes: 0 additions & 2 deletions model/flow/filter/identity.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED

package filter

import (
Expand Down
4 changes: 2 additions & 2 deletions model/flow/mapfunc/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func WithInitialWeight(weight uint64) flow.IdentityMapFunc[flow.Identity] {
}

// WithWeight returns an anonymous function that assigns the given weight value
// to `Identity.Weight`. This function is primarily intended for testing, as
// Identity structs should be immutable by convention.
// to `Identity.Weight`. We pass the input identity by value, i.e. copy on write,
// to avoid modifying the original input.
func WithWeight(weight uint64) flow.IdentityMapFunc[flow.Identity] {
return func(identity flow.Identity) flow.Identity {
identity.Weight = weight
Expand Down
192 changes: 103 additions & 89 deletions model/flow/protocol_state.go

Large diffs are not rendered by default.

103 changes: 95 additions & 8 deletions model/flow/protocol_state_test.go
AlexHentschel marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package flow_test

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/order"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -29,12 +31,12 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
})
}
stateEntry := &flow.ProtocolStateEntry{
PreviousEpoch: nil,
CurrentEpoch: flow.EpochStateContainer{
SetupID: setup.ID(),
CommitID: currentEpochCommit.ID(),
ActiveIdentities: identities,
},
PreviousEpochEventIDs: flow.EventIDs{},
InvalidStateTransitionAttempted: false,
}
entry, err := flow.NewRichProtocolStateEntry(
Expand All @@ -47,7 +49,7 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
nil,
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(identities, setup.Participants, nil)
expectedIdentities, err := flow.BuildIdentityTable(setup.Participants, identities, nil, nil)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, entry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants")
})
Expand All @@ -69,9 +71,10 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.PreviousEpochSetup.Participants,
stateEntry.PreviousEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + previous epoch setup participants")
Expand Down Expand Up @@ -99,17 +102,19 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + next epoch setup participants")
assert.Nil(t, richEntry.NextEpochCommit)
expectedIdentities, err = flow.BuildIdentityTable(
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.NextEpochIdentityTable, "should be equal to next epoch setup participants + current epoch setup participants")
Expand All @@ -135,16 +140,18 @@ func TestNewRichProtocolStateEntry(t *testing.T) {
)
assert.NoError(t, err)
expectedIdentities, err := flow.BuildIdentityTable(
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.CurrentEpochIdentityTable, "should be equal to current epoch setup participants + next epoch setup participants")
expectedIdentities, err = flow.BuildIdentityTable(
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.NextEpochSetup.Participants,
stateEntry.NextEpoch.ActiveIdentities,
stateEntry.CurrentEpochSetup.Participants,
stateEntry.CurrentEpoch.ActiveIdentities,
)
assert.NoError(t, err)
assert.Equal(t, expectedIdentities, richEntry.NextEpochIdentityTable, "should be equal to next epoch setup participants + current epoch setup participants")
Expand All @@ -161,7 +168,7 @@ func TestProtocolStateEntry_Copy(t *testing.T) {
cpy := entry.Copy()
assert.Equal(t, entry, cpy)
assert.NotSame(t, entry.NextEpoch, cpy.NextEpoch)
assert.NotSame(t, entry.PreviousEpochEventIDs, cpy.PreviousEpochEventIDs)
assert.NotSame(t, entry.PreviousEpoch, cpy.PreviousEpoch)
assert.NotSame(t, entry.CurrentEpoch, cpy.CurrentEpoch)

cpy.InvalidStateTransitionAttempted = !entry.InvalidStateTransitionAttempted
Expand All @@ -180,3 +187,83 @@ func TestProtocolStateEntry_Copy(t *testing.T) {
})
assert.NotEqual(t, entry.CurrentEpoch.ActiveIdentities, cpy.CurrentEpoch.ActiveIdentities)
}

// TestBuildIdentityTable tests if BuildIdentityTable returns a correct identity, whenever we pass arguments with or without
// overlap. It also tests if the function returns an error when the arguments are not ordered in the same order.
func TestBuildIdentityTable(t *testing.T) {
t.Run("happy-path-no-identities-overlap", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])

identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.NoError(t, err)

expectedIdentities := targetEpochIdentities.Union(adjacentEpochIdentities.Map(func(identity flow.Identity) flow.Identity {
identity.Weight = 0
return identity
}))
assert.Equal(t, expectedIdentities, identityList)
})
t.Run("happy-path-identities-overlap", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentities := unittest.IdentityListFixture(10)
sampledIdentities, err := targetEpochIdentities.Sample(2)
// change address so we can assert that we take identities from target epoch and not adjacent epoch
for i, identity := range sampledIdentities.Copy() {
identity.Address = fmt.Sprintf("%d", i)
adjacentEpochIdentities = append(adjacentEpochIdentities, identity)
}
assert.NoError(t, err)

identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.NoError(t, err)

expectedIdentities := targetEpochIdentities.Union(adjacentEpochIdentities.Map(func(identity flow.Identity) flow.Identity {
identity.Weight = 0
return identity
}))
assert.Equal(t, expectedIdentities, identityList)
})
t.Run("target-epoch-identities-not-ordered", func(t *testing.T) {
targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
targetEpochIdentitySkeletons, err := targetEpochIdentities.ToSkeleton().Shuffle()
assert.NoError(t, err)
targetEpochDynamicIdentities := flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities)

adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
identityList, err := flow.BuildIdentityTable(
targetEpochIdentitySkeletons,
targetEpochDynamicIdentities,
adjacentEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities),
)
assert.Error(t, err)
assert.Empty(t, identityList)
})
t.Run("adjacent-epoch-identities-not-ordered", func(t *testing.T) {
adjacentEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
adjacentEpochIdentitySkeletons, err := adjacentEpochIdentities.ToSkeleton().Shuffle()
assert.NoError(t, err)
adjacentEpochDynamicIdentities := flow.DynamicIdentityEntryListFromIdentities(adjacentEpochIdentities)

targetEpochIdentities := unittest.IdentityListFixture(10).Sort(order.Canonical[flow.Identity])
identityList, err := flow.BuildIdentityTable(
targetEpochIdentities.ToSkeleton(),
flow.DynamicIdentityEntryListFromIdentities(targetEpochIdentities),
adjacentEpochIdentitySkeletons,
adjacentEpochDynamicIdentities,
)
assert.Error(t, err)
assert.Empty(t, identityList)
})
}
10 changes: 10 additions & 0 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,11 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block, updater

err = updater.ProcessEpochSetup(ev)
if err != nil {
if protocol.IsInvalidServiceEventError(err) {
// we have observed an invalid service event, which triggers epoch fallback mode
updater.SetInvalidStateTransitionAttempted()
return dbUpdates, nil
}
Comment on lines +1122 to +1126
Copy link
Member

@AlexHentschel AlexHentschel Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️

I am a little worried about this:
At this point, we don't know that the updater's internal state is a valid Protocol State. For all we know, it could be complete garbage. There is no atomicity requirement on the StateUpdater as far as I can tell, in that it either applies the update entirely or not at all.

Suggestion:

  • I think it it is ok to treat all protocol state operations mutations in a block as one atomic state transition. It either succeeds in its entirety or in case of a failure, there is no update to the protocol state (except setting InvalidStateTransitionAttempted to true).
  • I think it is straight forward to extend the protocol state Updater to "drop all modifications if InvalidStateTransitionAttempted" and document this properly throughout the code base. In a nutshell, within the Build method I would check the state.InvalidStateTransitionAttempted first; in that case we could just return a copy of the parentState with only the state.InvalidStateTransitionAttempted set to true.
  • While the code changes are relatively small, we need to properly test this and add detailed documentation to the Updater, the corresponding interface, and ideally also the places where we call the Updater

Further thoughts:

  • If the updater is signalling that it encountered an invalid state transition via an InvalidStateTransitionAttempted, should the updater maybe already set its own InvalidStateTransitionAttempted flag? We would still raise the error to signal the failure to the caller. However, the caller would no longer be required to explicitly call updater.SetInvalidStateTransitionAttempted() (I'd leave that method, to allow Updater external logic to set this flag for other reasons).

I think we should not do this as part of the current PR. The PR is already big enough and we keep layering on changes which makes it hard to keep track of without re-reviewing the entire PR. Added todo to #4649

return nil, irrecoverable.NewExceptionf("could not process epoch setup event: %w", err)
}

Expand Down Expand Up @@ -1149,6 +1154,11 @@ func (m *FollowerState) handleEpochServiceEvents(candidate *flow.Block, updater

err = updater.ProcessEpochCommit(ev)
if err != nil {
if protocol.IsInvalidServiceEventError(err) {
// we have observed an invalid service event, which triggers epoch fallback mode
updater.SetInvalidStateTransitionAttempted()
return dbUpdates, nil
}
return nil, irrecoverable.NewExceptionf("could not process epoch commit event: %w", err)
}

Expand Down
22 changes: 22 additions & 0 deletions state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,28 @@ func TestExtendEpochSetupInvalid(t *testing.T) {
assertEpochEmergencyFallbackTriggered(t, state, true)
})
})

t.Run("participants not ordered (EECC)", func(t *testing.T) {
util.RunWithFullProtocolState(t, rootSnapshot, func(db *badger.DB, state *protocol.ParticipantState) {
block1, createSetup := setupState(t, db, state)

_, receipt, seal := createSetup(func(setup *flow.EpochSetup) {
var err error
setup.Participants, err = setup.Participants.Shuffle()
require.NoError(t, err)
})

receiptBlock, sealingBlock := unittest.SealBlock(t, state, block1, receipt, seal)
err := state.Finalize(context.Background(), receiptBlock.ID())
require.NoError(t, err)
// epoch fallback not triggered before finalization
assertEpochEmergencyFallbackTriggered(t, state, false)
err = state.Finalize(context.Background(), sealingBlock.ID())
require.NoError(t, err)
// epoch fallback triggered after finalization
assertEpochEmergencyFallbackTriggered(t, state, true)
})
})
}

// TestExtendEpochCommitInvalid tests that incorporating an invalid EpochCommit
Expand Down
2 changes: 1 addition & 1 deletion state/protocol/inmem/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func SnapshotFromBootstrapStateWithParams(
})
}
protocolState := &flow.ProtocolStateEntry{
PreviousEpochEventIDs: flow.EventIDs{},
PreviousEpoch: nil,
CurrentEpoch: flow.EpochStateContainer{
SetupID: setup.ID(),
CommitID: commit.ID(),
Expand Down
6 changes: 3 additions & 3 deletions state/protocol/inmem/dynamic_protocol_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {
entry := unittest.ProtocolStateFixture()
adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand All @@ -48,7 +48,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {

adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand All @@ -63,7 +63,7 @@ func TestDynamicProtocolStateAdapter(t *testing.T) {
entry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState())
adapter := inmem.NewDynamicProtocolStateAdapter(entry, globalParams)
status := adapter.EpochStatus()
assert.Equal(t, entry.PreviousEpochEventIDs, status.PreviousEpoch)
assert.Equal(t, entry.PreviousEpoch.EventIDs(), status.PreviousEpoch)
assert.Equal(t, flow.EventIDs{
SetupID: entry.CurrentEpoch.SetupID,
CommitID: entry.CurrentEpoch.CommitID,
Expand Down
6 changes: 4 additions & 2 deletions state/protocol/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ type StateUpdater interface {
// identities from previous+current epochs and start returning identities from current+next epochs.
// As a result of this operation protocol state for the next epoch will be created.
// CAUTION: Caller must validate input event.
// No errors are expected during normal operations.
// Expected errors during normal operations:
// - `protocol.InvalidServiceEventError` - an invalid service event with respect to the protocol state has been supplied.
ProcessEpochSetup(epochSetup *flow.EpochSetup) error
// ProcessEpochCommit updates current protocol state with data from epoch commit event.
// Observing an epoch setup commit, transitions protocol state from setup to commit phase, at this point we have
// finished construction of the next epoch.
// As a result of this operation protocol state for next epoch will be committed.
// CAUTION: Caller must validate input event.
// No errors are expected during normal operations.
// Expected errors during normal operations:
// - `protocol.InvalidServiceEventError` - an invalid service event with respect to the protocol state has been supplied.
ProcessEpochCommit(epochCommit *flow.EpochCommit) error
// UpdateIdentity updates identity table with new identity entry.
// Should pass identity which is already present in the table, otherwise an exception will be raised.
Expand Down
Loading
Loading