Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cf7138c

Browse files
committedMar 7, 2025·
cli: add upload-state command
Close #3782 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
1 parent 9c02748 commit cf7138c

File tree

8 files changed

+269
-18
lines changed

8 files changed

+269
-18
lines changed
 

‎cli/server/dump_bin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func dumpBin(ctx *cli.Context) error {
3030
count := uint32(ctx.Uint("count"))
3131
start := uint32(ctx.Uint("start"))
3232

33-
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
33+
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
3434
if err != nil {
3535
return err
3636
}

‎cli/server/server.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -140,25 +140,26 @@ func newGraceContext() context.Context {
140140
return ctx
141141
}
142142

143-
func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
144-
chain, _, err := initBlockChain(cfg, log)
143+
// InitBCWithMetrics initializes the blockchain with metrics with the given configuration.
144+
func InitBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, *metrics.Service, *metrics.Service, error) {
145+
chain, store, err := initBlockChain(cfg, log)
145146
if err != nil {
146-
return nil, nil, nil, cli.Exit(err, 1)
147+
return nil, nil, nil, nil, cli.Exit(err, 1)
147148
}
148149
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
149150
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
150151

151152
go chain.Run()
152153
err = prometheus.Start()
153154
if err != nil {
154-
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
155+
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Prometheus service: %w", err), 1)
155156
}
156157
err = pprof.Start()
157158
if err != nil {
158-
return nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
159+
return nil, nil, nil, nil, cli.Exit(fmt.Errorf("failed to start Pprof service: %w", err), 1)
159160
}
160161

161-
return chain, prometheus, pprof, nil
162+
return chain, store, prometheus, pprof, nil
162163
}
163164

164165
func dumpDB(ctx *cli.Context) error {
@@ -189,7 +190,7 @@ func dumpDB(ctx *cli.Context) error {
189190
defer outStream.Close()
190191
writer := io.NewBinWriterFromIO(outStream)
191192

192-
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
193+
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
193194
if err != nil {
194195
return err
195196
}
@@ -249,7 +250,7 @@ func restoreDB(ctx *cli.Context) error {
249250
cfg.ApplicationConfiguration.SaveStorageBatch = true
250251
}
251252

252-
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
253+
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
253254
if err != nil {
254255
return err
255256
}
@@ -470,7 +471,7 @@ func startServer(ctx *cli.Context) error {
470471
return cli.Exit(err, 1)
471472
}
472473

473-
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
474+
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, log)
474475
if err != nil {
475476
return cli.Exit(err, 1)
476477
}

‎cli/server/server_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ func TestInitBCWithMetrics(t *testing.T) {
185185
})
186186

187187
t.Run("bad store", func(t *testing.T) {
188-
_, _, _, err = initBCWithMetrics(config.Config{}, logger)
188+
_, _, _, _, err = InitBCWithMetrics(config.Config{}, logger)
189189
require.Error(t, err)
190190
})
191191

192-
chain, prometheus, pprof, err := initBCWithMetrics(cfg, logger)
192+
chain, _, prometheus, pprof, err := InitBCWithMetrics(cfg, logger)
193193
require.NoError(t, err)
194194
t.Cleanup(func() {
195195
chain.Close()

‎cli/util/convert.go

+19
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ func NewCommands() []*cli.Command {
9090
}, options.RPC...)
9191
uploadBinFlags = append(uploadBinFlags, options.Wallet...)
9292
uploadBinFlags = append(uploadBinFlags, neoFSFlags...)
93+
94+
uploadStateFlags := append([]cli.Flag{
95+
&cli.StringFlag{
96+
Name: "state-attribute",
97+
Usage: "Attribute key of the state object",
98+
Value: neofs.DefaultStateAttribute,
99+
Action: cmdargs.EnsureNotEmpty("state-attribute"),
100+
},
101+
options.Debug, options.Config, options.ConfigFile, options.RelativePath,
102+
}, options.Wallet...)
103+
uploadStateFlags = append(uploadStateFlags, options.Network...)
104+
uploadStateFlags = append(uploadStateFlags, neoFSFlags...)
93105
return []*cli.Command{
94106
{
95107
Name: "util",
@@ -174,6 +186,13 @@ func NewCommands() []*cli.Command {
174186
Action: uploadBin,
175187
Flags: uploadBinFlags,
176188
},
189+
{
190+
Name: "upload-state",
191+
Usage: "Start the node, traverse MPT and upload MPT nodes to the NeoFS container at every StateSyncInterval number of blocks",
192+
UsageText: "neo-go util upload-state --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --state-attribute state --wallet <wallet> [--wallet-config <config>] [--address <address>] [--searchers <num>] [--retries <num>] [--debug] [--config-path path] [-p/-m/-t] [--config-file file]",
193+
Action: uploadState,
194+
Flags: uploadStateFlags,
195+
},
177196
},
178197
},
179198
}

‎cli/util/upload_state.go

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package util
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"time"
7+
8+
"github.com/nspcc-dev/neo-go/cli/cmdargs"
9+
"github.com/nspcc-dev/neo-go/cli/options"
10+
"github.com/nspcc-dev/neo-go/cli/server"
11+
"github.com/nspcc-dev/neo-go/pkg/core"
12+
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
13+
"github.com/nspcc-dev/neo-go/pkg/core/storage"
14+
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
15+
gio "github.com/nspcc-dev/neo-go/pkg/io"
16+
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
17+
"github.com/nspcc-dev/neo-go/pkg/util"
18+
"github.com/nspcc-dev/neofs-sdk-go/client"
19+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
20+
"github.com/nspcc-dev/neofs-sdk-go/object"
21+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
22+
"github.com/urfave/cli/v2"
23+
"go.uber.org/zap"
24+
)
25+
26+
func uploadState(ctx *cli.Context) error {
27+
if err := cmdargs.EnsureNone(ctx); err != nil {
28+
return err
29+
}
30+
cfg, err := options.GetConfigFromContext(ctx)
31+
if err != nil {
32+
return cli.Exit(err, 1)
33+
}
34+
attr := ctx.String("state-attribute")
35+
maxRetries := ctx.Uint("retries")
36+
debug := ctx.Bool("debug")
37+
38+
acc, _, err := options.GetAccFromContext(ctx)
39+
if err != nil {
40+
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
41+
}
42+
43+
signer, p, err := options.GetNeoFSClientPool(ctx, acc)
44+
if err != nil {
45+
return cli.Exit(err, 1)
46+
}
47+
defer p.Close()
48+
log, _, logCloser, err := options.HandleLoggingParams(debug, cfg.ApplicationConfiguration)
49+
if err != nil {
50+
return cli.Exit(err, 1)
51+
}
52+
if logCloser != nil {
53+
defer func() { _ = logCloser() }()
54+
}
55+
56+
chain, store, prometheus, pprof, err := server.InitBCWithMetrics(cfg, log)
57+
if err != nil {
58+
return err
59+
}
60+
defer func() {
61+
pprof.ShutDown()
62+
prometheus.ShutDown()
63+
chain.Close()
64+
}()
65+
66+
if chain.GetConfig().Ledger.KeepOnlyLatestState || chain.GetConfig().Ledger.RemoveUntraceableBlocks {
67+
return cli.Exit("only full-state node is supported: disable KeepOnlyLatestState and RemoveUntraceableBlocks", 1)
68+
}
69+
syncInterval := cfg.ProtocolConfiguration.StateSyncInterval
70+
if syncInterval == 0 {
71+
syncInterval = core.DefaultStateSyncInterval
72+
}
73+
74+
containerID, err := getContainer(ctx, p, strconv.Itoa(int(chain.GetConfig().Magic)), maxRetries, debug)
75+
if err != nil {
76+
return cli.Exit(err, 1)
77+
}
78+
79+
stateObjCount, err := searchStateIndex(ctx, p, containerID, acc.PrivateKey(), attr, syncInterval, maxRetries, debug)
80+
if err != nil {
81+
return cli.Exit(fmt.Sprintf("failed searching existing states: %v", err), 1)
82+
}
83+
stateModule := chain.GetStateModule()
84+
currentHeight := int(stateModule.CurrentLocalHeight())
85+
currentStateIndex := currentHeight / syncInterval
86+
if currentStateIndex <= stateObjCount {
87+
log.Info("no new states to upload",
88+
zap.Int("number of uploaded state objects", stateObjCount),
89+
zap.Int("latest state is uploaded for block", stateObjCount*syncInterval),
90+
zap.Int("current height", currentHeight),
91+
zap.Int("StateSyncInterval", syncInterval))
92+
return nil
93+
}
94+
log.Info("starting uploading",
95+
zap.Int("number of uploaded state objects", stateObjCount),
96+
zap.Int("next state to upload for block", stateObjCount*syncInterval),
97+
zap.Int("current height", currentHeight),
98+
zap.Int("StateSyncInterval", syncInterval),
99+
zap.Int("number of states to upload", currentStateIndex-stateObjCount))
100+
for state := stateObjCount; state < currentStateIndex; state++ {
101+
height := uint32(state * syncInterval)
102+
stateRoot, err := stateModule.GetStateRoot(height)
103+
if err != nil {
104+
return cli.Exit(fmt.Sprintf("failed to get state root for height %d: %v", height, err), 1)
105+
}
106+
h, err := chain.GetHeader(chain.GetHeaderHash(height))
107+
if err != nil {
108+
return cli.Exit(fmt.Sprintf("failed to get header %d: %v", height, err), 1)
109+
}
110+
111+
var (
112+
hdr object.Object
113+
prmObjectPutInit client.PrmObjectPutInit
114+
attrs = []object.Attribute{
115+
*object.NewAttribute(attr, strconv.Itoa(int(height))),
116+
*object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
117+
*object.NewAttribute("StateRoot", stateRoot.Root.StringLE()),
118+
*object.NewAttribute("StateSyncInterval", strconv.Itoa(syncInterval)),
119+
*object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)),
120+
}
121+
)
122+
hdr.SetContainerID(containerID)
123+
hdr.SetOwner(signer.UserID())
124+
hdr.SetAttributes(attrs...)
125+
err = retry(func() error {
126+
writer, err := p.ObjectPutInit(ctx.Context, hdr, signer, prmObjectPutInit)
127+
if err != nil {
128+
return err
129+
}
130+
start := time.Now()
131+
wrt := gio.NewBinWriterFromIO(writer)
132+
wrt.WriteB(byte(0))
133+
wrt.WriteU32LE(uint32(chain.GetConfig().Magic))
134+
wrt.WriteU32LE(height)
135+
wrt.WriteBytes(stateRoot.Root[:])
136+
err = traverseMPT(stateRoot.Root, store, wrt)
137+
if err != nil {
138+
_ = writer.Close()
139+
return err
140+
}
141+
err = writer.Close()
142+
if err != nil {
143+
return err
144+
}
145+
duration := time.Since(start)
146+
res := writer.GetResult()
147+
log.Info("uploaded state object",
148+
zap.String("object ID", res.StoredObjectID().String()),
149+
zap.Uint32("height", height),
150+
zap.Duration("time spent", duration))
151+
return nil
152+
}, maxRetries, debug)
153+
if err != nil {
154+
return cli.Exit(fmt.Sprintf("failed to upload object at height %d: %v", height, err), 1)
155+
}
156+
}
157+
return nil
158+
}
159+
160+
func searchStateIndex(ctx *cli.Context, p neofs.PoolWrapper, containerID cid.ID, privKeys *keys.PrivateKey,
161+
attributeKey string, syncInterval int, maxRetries uint, debug bool,
162+
) (int, error) {
163+
var (
164+
doneCh = make(chan struct{})
165+
errCh = make(chan error)
166+
objCount = 0
167+
)
168+
169+
go func() {
170+
defer close(doneCh)
171+
for i := 0; ; i++ {
172+
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys,
173+
attributeKey, uint(i*syncInterval), uint(i*syncInterval)+1, 1, maxRetries, debug, errCh)
174+
resOIDs := make([]oid.ID, 0, 1)
175+
for id := range indexIDs {
176+
resOIDs = append(resOIDs, id)
177+
}
178+
if len(resOIDs) == 0 {
179+
break
180+
}
181+
if len(resOIDs) > 1 {
182+
fmt.Fprintf(ctx.App.Writer, "WARN: %d duplicated state objects with %s: %d found: %s\n", len(resOIDs), attributeKey, i, resOIDs)
183+
}
184+
objCount++
185+
}
186+
}()
187+
select {
188+
case err := <-errCh:
189+
return objCount, err
190+
case <-doneCh:
191+
return objCount, nil
192+
}
193+
}
194+
195+
func traverseMPT(root util.Uint256, store storage.Store, writer *gio.BinWriter) error {
196+
cache := storage.NewMemCachedStore(store)
197+
billet := mpt.NewBillet(root, mpt.ModeAll, mpt.DummySTTempStoragePrefix, cache)
198+
err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
199+
writer.WriteVarBytes(nodeBytes)
200+
return writer.Err != nil
201+
}, false)
202+
if err != nil {
203+
return fmt.Errorf("billet traversal error: %w", err)
204+
}
205+
return nil
206+
}

‎docs/neofs-blockstorage.md

+25-3
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ parameter.
7373
Once all blocks available in the NeoFS container are processed, the service
7474
shuts down automatically.
7575

76-
### NeoFS Upload Command
77-
The `upload-bin` command is designed to fetch blocks from the RPC node and upload
76+
### NeoFS block uploading command
77+
The `util upload-bin` command is designed to fetch blocks from the RPC node and upload
7878
them to the NeoFS container. It also creates and uploads index files. Below is an
7979
example usage of the command:
8080

@@ -101,4 +101,26 @@ files are needed (different `index-file-size` or `index-attribute`), `upload-bin
101101
will upload the entire block sequence starting from genesis since no migration is
102102
supported yet by this command. Please, add a comment to the
103103
[#3744](https://github.com/nspcc-dev/neo-go/issues/3744) issue if you need this
104-
functionality.
104+
functionality.
105+
106+
### NeoFS state uploading command
107+
The `util upload-state` command is used to start a node, traverse the MPT over the
108+
smart contract storage, and upload MPT nodes to a NeoFS container at every
109+
`StateSyncInterval` number of blocks. Below is an example usage of the command:
110+
111+
```shell
112+
./bin/neo-go util upload-state --cid 9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG --wallet-config ./wallet-config.yml --state-attribute State -m -fsr st1.t5.fs.neo.org:8080 -fsr st2.t5.fs.neo.org:8080 -fsr st3.t5.fs.neo.org:8080
113+
```
114+
115+
Run `./bin/neo-go util upload-state --help` to see the full list of supported options.
116+
117+
This command works as follows:
118+
1. Searches for the state objects stored in NeoFS to find the latest uploaded object.
119+
2. Checks if new state objects could be uploaded given the current local state height.
120+
3. Traverses the MPT nodes (pre-order) starting from the stateroot at the height of the
121+
latest uploaded state object down to its children.
122+
4. Uploads the MPT nodes to the NeoFS container.
123+
5. Repeats steps 3-4 with a step equal to the `StateSyncInterval` number of blocks.
124+
125+
If the command is interrupted, it can be resumed. It starts the uploading process
126+
from the last uploaded state object.

‎pkg/core/blockchain.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ const (
6161
defaultTimePerBlock = 15 * time.Second
6262
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
6363
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
64-
defaultStateSyncInterval = 40000
64+
// DefaultStateSyncInterval is the default interval for state sync.
65+
DefaultStateSyncInterval = 40000
6566

6667
// defaultBlockTimesCache should be sufficient for tryRunGC() to get in
6768
// sync with storeBlock(). Most of the time they differ by some thousands of
@@ -310,7 +311,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
310311
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
311312
}
312313
if cfg.StateSyncInterval <= 0 {
313-
cfg.StateSyncInterval = defaultStateSyncInterval
314+
cfg.StateSyncInterval = DefaultStateSyncInterval
314315
log.Info("StateSyncInterval is not set or wrong, using default value",
315316
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
316317
}
@@ -320,7 +321,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
320321
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
321322
}
322323
if cfg.StateSyncInterval <= 0 {
323-
cfg.StateSyncInterval = defaultStateSyncInterval
324+
cfg.StateSyncInterval = DefaultStateSyncInterval
324325
log.Info("StateSyncInterval is not set or wrong, using default value",
325326
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
326327
}

‎pkg/services/helpers/neofs/blockstorage.go

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
DefaultBlockAttribute = "Block"
1919
// DefaultIndexFileAttribute is the default attribute name for index file objects.
2020
DefaultIndexFileAttribute = "Index"
21+
// DefaultStateAttribute is the default attribute name for state objects.
22+
DefaultStateAttribute = "State"
2123

2224
// DefaultSearchBatchSize is a number of objects to search in a batch. We need to
2325
// search with EQ filter to avoid partially-completed SEARCH responses. If EQ search
@@ -59,7 +61,7 @@
5961
}
6062

6163
// Close closes the pool and returns nil.
6264
func (p PoolWrapper) Close() error {
6365
p.Pool.Close()
6466
return nil
6567
}

0 commit comments

Comments
 (0)
Please sign in to comment.