Skip to content

Commit 0dc42cf

Browse files
committed
Merge branch 'master' of github.com:AndriiDiachuk/flow-go into deriveTransactionStatus-clean-up-blockID-argument
2 parents c17d631 + ad12394 commit 0dc42cf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1968
-540
lines changed

admin/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,9 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
109109
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get_config" }}'
110110
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "set_config", "limit": 1, "burst": 1 }}'
111111
```
112+
113+
### To create a protocol snapshot for latest checkpoint (execution node only)
114+
```
115+
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot"}'
116+
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot", "data": { "blocks-to-skip": 10 }}'
117+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/rs/zerolog"
8+
9+
"github.com/onflow/flow-go/admin"
10+
"github.com/onflow/flow-go/admin/commands"
11+
"github.com/onflow/flow-go/cmd/util/common"
12+
"github.com/onflow/flow-go/state/protocol"
13+
"github.com/onflow/flow-go/state/protocol/inmem"
14+
"github.com/onflow/flow-go/storage"
15+
"github.com/onflow/flow-go/utils/logging"
16+
)
17+
18+
var _ commands.AdminCommand = (*ProtocolSnapshotCommand)(nil)
19+
20+
type protocolSnapshotData struct {
21+
blocksToSkip uint
22+
}
23+
24+
// ProtocolSnapshotCommand is a command that generates a protocol snapshot for a checkpoint (usually latest checkpoint)
25+
// This command is only available for execution node
26+
type ProtocolSnapshotCommand struct {
27+
logger zerolog.Logger
28+
state protocol.State
29+
headers storage.Headers
30+
seals storage.Seals
31+
checkpointDir string // the directory where the checkpoint is stored
32+
}
33+
34+
func NewProtocolSnapshotCommand(
35+
logger zerolog.Logger,
36+
state protocol.State,
37+
headers storage.Headers,
38+
seals storage.Seals,
39+
checkpointDir string,
40+
) *ProtocolSnapshotCommand {
41+
return &ProtocolSnapshotCommand{
42+
logger: logger,
43+
state: state,
44+
headers: headers,
45+
seals: seals,
46+
checkpointDir: checkpointDir,
47+
}
48+
}
49+
50+
func (s *ProtocolSnapshotCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) {
51+
validated, ok := req.ValidatorData.(*protocolSnapshotData)
52+
if !ok {
53+
return nil, fmt.Errorf("fail to parse validator data")
54+
}
55+
56+
blocksToSkip := validated.blocksToSkip
57+
58+
s.logger.Info().Uint("blocksToSkip", blocksToSkip).Msgf("admintool: generating protocol snapshot")
59+
60+
snapshot, sealedHeight, commit, err := common.GenerateProtocolSnapshotForCheckpoint(
61+
s.logger, s.state, s.headers, s.seals, s.checkpointDir, blocksToSkip)
62+
if err != nil {
63+
return nil, fmt.Errorf("could not generate protocol snapshot for checkpoint, checkpointDir %v: %w",
64+
s.checkpointDir, err)
65+
}
66+
67+
header, err := snapshot.Head()
68+
if err != nil {
69+
return nil, fmt.Errorf("could not get header from snapshot: %w", err)
70+
}
71+
72+
serializable, err := inmem.FromSnapshot(snapshot)
73+
if err != nil {
74+
return nil, fmt.Errorf("could not convert snapshot to serializable: %w", err)
75+
}
76+
77+
s.logger.Info().
78+
Uint64("finalized_height", header.Height). // finalized height
79+
Hex("finalized_block_id", logging.Entity(header)).
80+
Uint64("sealed_height", sealedHeight).
81+
Hex("sealed_commit", commit[:]). // not the commit for the finalized height, but for the sealed height
82+
Uint("blocks_to_skip", blocksToSkip).
83+
Msgf("admintool: protocol snapshot generated successfully")
84+
85+
return commands.ConvertToMap(serializable.Encodable())
86+
}
87+
88+
func (s *ProtocolSnapshotCommand) Validator(req *admin.CommandRequest) error {
89+
// blocksToSkip is the number of blocks to skip when iterating the sealed heights to find the state commitment
90+
// in the checkpoint file.
91+
// default is 0
92+
validated := &protocolSnapshotData{
93+
blocksToSkip: uint(0),
94+
}
95+
96+
input, ok := req.Data.(map[string]interface{})
97+
if ok {
98+
data, ok := input["blocks-to-skip"]
99+
100+
if ok {
101+
n, ok := data.(float64)
102+
if !ok {
103+
return fmt.Errorf("could not parse blocks-to-skip: %v", data)
104+
}
105+
validated.blocksToSkip = uint(n)
106+
}
107+
}
108+
109+
req.ValidatorData = validated
110+
111+
return nil
112+
}

cmd/access/node_builder/access_node_builder.go

+15-8
Original file line numberDiff line numberDiff line change
@@ -904,17 +904,23 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
904904

905905
builder.stateStreamBackend, err = statestreambackend.New(
906906
node.Logger,
907-
builder.stateStreamConf,
908907
node.State,
909908
node.Storage.Headers,
910909
node.Storage.Seals,
911910
node.Storage.Results,
912911
builder.ExecutionDataStore,
913912
executionDataStoreCache,
914-
broadcaster,
915913
builder.RegistersAsyncStore,
916914
builder.EventsIndex,
917915
useIndex,
916+
int(builder.stateStreamConf.RegisterIDsRequestLimit),
917+
subscription.NewSubscriptionHandler(
918+
builder.Logger,
919+
broadcaster,
920+
builder.stateStreamConf.ClientSendTimeout,
921+
builder.stateStreamConf.ResponseLimit,
922+
builder.stateStreamConf.ClientSendBufferSize,
923+
),
918924
executionDataTracker,
919925
)
920926
if err != nil {
@@ -1638,12 +1644,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
16381644
ScriptExecutionMode: scriptExecMode,
16391645
EventQueryMode: eventQueryMode,
16401646
BlockTracker: blockTracker,
1641-
SubscriptionParams: backend.SubscriptionParams{
1642-
Broadcaster: broadcaster,
1643-
SendTimeout: builder.stateStreamConf.ClientSendTimeout,
1644-
ResponseLimit: builder.stateStreamConf.ResponseLimit,
1645-
SendBufferSize: int(builder.stateStreamConf.ClientSendBufferSize),
1646-
},
1647+
SubscriptionHandler: subscription.NewSubscriptionHandler(
1648+
builder.Logger,
1649+
broadcaster,
1650+
builder.stateStreamConf.ClientSendTimeout,
1651+
builder.stateStreamConf.ResponseLimit,
1652+
builder.stateStreamConf.ClientSendBufferSize,
1653+
),
16471654
EventsIndex: builder.EventsIndex,
16481655
TxResultQueryMode: txResultQueryMode,
16491656
TxResultsIndex: builder.TxResultsIndex,

cmd/execution_builder.go

+9
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,15 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
192192
AdminCommand("get-transactions", func(conf *NodeConfig) commands.AdminCommand {
193193
return storageCommands.NewGetTransactionsCommand(conf.State, conf.Storage.Payloads, conf.Storage.Collections)
194194
}).
195+
AdminCommand("protocol-snapshot", func(conf *NodeConfig) commands.AdminCommand {
196+
return storageCommands.NewProtocolSnapshotCommand(
197+
conf.Logger,
198+
conf.State,
199+
conf.Storage.Headers,
200+
conf.Storage.Seals,
201+
exeNode.exeConf.triedir,
202+
)
203+
}).
195204
Module("mutable follower state", exeNode.LoadMutableFollowerState).
196205
Module("system specs", exeNode.LoadSystemSpecs).
197206
Module("execution metrics", exeNode.LoadExecutionMetrics).

cmd/observer/node_builder/observer_builder.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1408,17 +1408,23 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
14081408

14091409
builder.stateStreamBackend, err = statestreambackend.New(
14101410
node.Logger,
1411-
builder.stateStreamConf,
14121411
node.State,
14131412
node.Storage.Headers,
14141413
node.Storage.Seals,
14151414
node.Storage.Results,
14161415
builder.ExecutionDataStore,
14171416
executionDataStoreCache,
1418-
broadcaster,
14191417
builder.RegistersAsyncStore,
14201418
builder.EventsIndex,
14211419
useIndex,
1420+
int(builder.stateStreamConf.RegisterIDsRequestLimit),
1421+
subscription.NewSubscriptionHandler(
1422+
builder.Logger,
1423+
broadcaster,
1424+
builder.stateStreamConf.ClientSendTimeout,
1425+
builder.stateStreamConf.ResponseLimit,
1426+
builder.stateStreamConf.ClientSendBufferSize,
1427+
),
14221428
executionDataTracker,
14231429
)
14241430
if err != nil {

cmd/util/cmd/read-protocol-state/cmd/snapshot.go

+37
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@ import (
55
"github.com/spf13/cobra"
66

77
"github.com/onflow/flow-go/cmd/util/cmd/common"
8+
commonFuncs "github.com/onflow/flow-go/cmd/util/common"
9+
"github.com/onflow/flow-go/model/flow"
810
"github.com/onflow/flow-go/state/protocol"
911
"github.com/onflow/flow-go/state/protocol/inmem"
1012
)
1113

14+
var flagCheckpointDir string
15+
var flagCheckpointScanStep uint
16+
var flagCheckpointScanEndHeight int64
17+
1218
var SnapshotCmd = &cobra.Command{
1319
Use: "snapshot",
1420
Short: "Read snapshot from protocol state",
@@ -26,6 +32,15 @@ func init() {
2632

2733
SnapshotCmd.Flags().BoolVar(&flagSealed, "sealed", false,
2834
"get sealed block")
35+
36+
SnapshotCmd.Flags().StringVar(&flagCheckpointDir, "checkpoint-dir", "",
37+
"(execution node only) get snapshot from the latest checkpoint file in the given checkpoint directory")
38+
39+
SnapshotCmd.Flags().UintVar(&flagCheckpointScanStep, "checkpoint-scan-step", 0,
40+
"(execution node only) scan step for finding sealed height by checkpoint (use with --checkpoint-dir flag)")
41+
42+
SnapshotCmd.Flags().Int64Var(&flagCheckpointScanEndHeight, "checkpoint-scan-end-height", -1,
43+
"(execution node only) scan end height for finding sealed height by checkpoint (use with --checkpoint-dir flag)")
2944
}
3045

3146
func runSnapshot(*cobra.Command, []string) {
@@ -49,6 +64,28 @@ func runSnapshot(*cobra.Command, []string) {
4964
} else if flagSealed {
5065
log.Info().Msgf("get last sealed snapshot")
5166
snapshot = state.Sealed()
67+
} else if flagCheckpointDir != "" {
68+
log.Info().Msgf("get snapshot for latest checkpoint in directory %v (step: %v, endHeight: %v)",
69+
flagCheckpointDir, flagCheckpointScanStep, flagCheckpointScanEndHeight)
70+
var protocolSnapshot protocol.Snapshot
71+
var sealedHeight uint64
72+
var sealedCommit flow.StateCommitment
73+
if flagCheckpointScanEndHeight < 0 {
74+
// using default end height which is the last sealed height
75+
protocolSnapshot, sealedHeight, sealedCommit, err = commonFuncs.GenerateProtocolSnapshotForCheckpoint(
76+
log.Logger, state, storages.Headers, storages.Seals, flagCheckpointDir, flagCheckpointScanStep)
77+
} else {
78+
// using customized end height
79+
protocolSnapshot, sealedHeight, sealedCommit, err = commonFuncs.GenerateProtocolSnapshotForCheckpointWithHeights(
80+
log.Logger, state, storages.Headers, storages.Seals, flagCheckpointDir, flagCheckpointScanStep, uint64(flagCheckpointScanEndHeight))
81+
}
82+
83+
if err != nil {
84+
log.Fatal().Err(err).Msgf("could not generate protocol snapshot for checkpoint in dir: %v", flagCheckpointDir)
85+
}
86+
87+
snapshot = protocolSnapshot
88+
log.Info().Msgf("snapshot found, sealed height %v, commit %x", sealedHeight, sealedCommit)
5289
}
5390

5491
head, err := snapshot.Head()

0 commit comments

Comments
 (0)