Skip to content

Commit d30a9db

Browse files
committed
(BIDS-2330) misc: add command fix-blocks
1 parent 2bf7611 commit d30a9db

File tree

1 file changed

+212
-4
lines changed

1 file changed

+212
-4
lines changed

cmd/misc/main.go

+212-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"errors"
78
"eth2-exporter/db"
89
"eth2-exporter/exporter"
910
"eth2-exporter/rpc"
@@ -15,8 +16,10 @@ import (
1516
"math"
1617
"math/big"
1718
"net/http"
19+
"sort"
1820
"strconv"
1921
"strings"
22+
"sync"
2023
"time"
2124

2225
"github.com/coocood/freecache"
@@ -53,7 +56,7 @@ var opts = struct {
5356

5457
func main() {
5558
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
56-
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
59+
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, fix-exec-transactions-count, fix-blocks")
5760
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
5861
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
5962
flag.Uint64Var(&opts.User, "user", 0, "user id")
@@ -71,7 +74,7 @@ func main() {
7174
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
7275
flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})")
7376
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
74-
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
77+
flag.BoolVar(&opts.DryRun, "dry-run", true, "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
7578
versionFlag := flag.Bool("version", false, "Show version and exit")
7679
flag.Parse()
7780

@@ -81,8 +84,6 @@ func main() {
8184
return
8285
}
8386

84-
opts.DryRun = *dryRun != "false"
85-
8687
logrus.WithField("config", *configPath).WithField("version", version.Version).Printf("starting")
8788
cfg := &types.Config{}
8889
err := utils.ReadConfig(cfg, *configPath)
@@ -362,6 +363,8 @@ func main() {
362363
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
363364
case "fix-exec-transactions-count":
364365
err = fixExecTransactionsCount()
366+
case "fix-blocks":
367+
err = fixBlocks()
365368
default:
366369
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
367370
}
@@ -373,6 +376,211 @@ func main() {
373376
}
374377
}
375378

379+
func fixBlocks() error {
380+
logrus.WithFields(logrus.Fields{"startBlock": opts.StartBlock, "endBlock": opts.EndBlock, "dry": opts.DryRun}).Infof("running command fixBlocks")
381+
382+
type ClEthV1ConfigSpecResponse struct {
383+
Data map[string]string `json:"data"`
384+
}
385+
386+
type ClEthV2BeaconBlocksBlockResponse struct {
387+
Version string `json:"version"`
388+
ExecutionOptimistic bool `json:"execution_optimistic"`
389+
Finalized bool `json:"finalized"`
390+
Data struct {
391+
Message struct {
392+
Slot string `json:"slot"`
393+
}
394+
}
395+
}
396+
397+
spec := &ClEthV1ConfigSpecResponse{}
398+
err := utils.HttpReq(context.Background(), http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/config/spec", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), nil, spec)
399+
if err != nil {
400+
logrus.Fatal(err)
401+
}
402+
nodeDepositNetworkId, ok := spec.Data["DEPOSIT_NETWORK_ID"]
403+
if !ok {
404+
logrus.Fatal(fmt.Errorf("missing DEPOSIT_NETWORK_ID in spec from node"))
405+
}
406+
if fmt.Sprintf("%d", utils.Config.Chain.ClConfig.DepositNetworkID) != nodeDepositNetworkId {
407+
logrus.Fatal(fmt.Errorf("config.DepositNetworkId != node.DepositNetworkId: %v != %v", utils.Config.Chain.ClConfig.DepositNetworkID, nodeDepositNetworkId))
408+
}
409+
410+
start := opts.StartBlock
411+
end := opts.EndBlock
412+
if end == 0 {
413+
end = math.MaxUint64
414+
}
415+
416+
for i := start; i < end; i += 1000 {
417+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
418+
defer cancel()
419+
res := &ClEthV2BeaconBlocksBlockResponse{}
420+
err := utils.HttpReq(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/beacon/blocks/finalized", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), nil, res)
421+
if err != nil {
422+
return fmt.Errorf("error getting finalized block: %w", err)
423+
}
424+
425+
finalizedSlot, err := strconv.ParseUint(res.Data.Message.Slot, 10, 64)
426+
427+
iStart := i
428+
iEnd := i + 1000
429+
if iEnd > finalizedSlot+1 {
430+
iEnd = finalizedSlot + 1
431+
}
432+
433+
err = fixBlocksInRange(int(iStart), int(iEnd))
434+
if err != nil {
435+
return err
436+
}
437+
438+
if iEnd > finalizedSlot {
439+
return nil
440+
}
441+
}
442+
return nil
443+
}
444+
445+
func fixBlocksInRange(start, end int) error {
446+
type ClEthV2BeaconBlocksBlockRootResponse struct {
447+
ExecutionOptimistic bool `json:"execution_optimistic"`
448+
Finalized bool `json:"finalized"`
449+
Data struct {
450+
Root string `json:"root"`
451+
} `json:"data"`
452+
}
453+
454+
logrus.Infof("checking blocks [%v-%v)", start, end)
455+
456+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
457+
defer cancel()
458+
459+
var err error
460+
type DbBlock struct {
461+
Slot uint64 `db:"slot"`
462+
ExecBlockNumber uint64 `db:"exec_block_number"`
463+
BlockRoot []byte `db:"blockroot"`
464+
}
465+
type Block struct {
466+
Slot uint64
467+
DbBlocks []*DbBlock
468+
ClBlockRootReponse *ClEthV2BeaconBlocksBlockRootResponse
469+
}
470+
471+
dbBlocks := []*DbBlock{}
472+
err = db.WriterDb.Select(&dbBlocks, `SELECT slot, coalesce(exec_block_number,0) exec_block_number, blockroot FROM blocks WHERE status = '1' AND slot >= $1 AND slot < $2`, start, end)
473+
if err != nil {
474+
return fmt.Errorf("error getting dbBlocks: %w", err)
475+
}
476+
477+
blocksBySlotMu := sync.Mutex{}
478+
blocksBySlot := map[uint64]*Block{}
479+
480+
for _, dbBlock := range dbBlocks {
481+
block, exists := blocksBySlot[dbBlock.Slot]
482+
if !exists {
483+
block = &Block{Slot: dbBlock.Slot}
484+
blocksBySlot[dbBlock.Slot] = block
485+
}
486+
block.DbBlocks = append(block.DbBlocks, dbBlock)
487+
}
488+
489+
g, gCtx := errgroup.WithContext(ctx)
490+
g.SetLimit(4)
491+
492+
for slot := start; slot < end; slot++ {
493+
slot := slot
494+
g.Go(func() error {
495+
select {
496+
case <-gCtx.Done():
497+
return gCtx.Err()
498+
default:
499+
}
500+
res := &ClEthV2BeaconBlocksBlockRootResponse{}
501+
ctx, cancel := context.WithTimeout(gCtx, time.Second*10)
502+
defer cancel()
503+
var err error
504+
for i := 0; i < 10; i++ {
505+
if i > 0 {
506+
time.Sleep(time.Second * time.Duration(i))
507+
}
508+
err = utils.HttpReq(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/eth/v1/beacon/blocks/%d/root", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port, slot), nil, res)
509+
if err != nil {
510+
var httpErr *utils.HttpReqHttpError
511+
if errors.As(err, &httpErr) && httpErr.StatusCode == 404 {
512+
// no sidecar for this slot
513+
return nil
514+
}
515+
logrus.Errorf("error getting blockroot for slot %v: %v (attempt %v)", slot, err, i)
516+
}
517+
break
518+
}
519+
if err != nil {
520+
return err
521+
}
522+
blocksBySlotMu.Lock()
523+
block, exists := blocksBySlot[uint64(slot)]
524+
if !exists {
525+
block = &Block{Slot: uint64(slot)}
526+
blocksBySlot[uint64(slot)] = block
527+
}
528+
block.ClBlockRootReponse = res
529+
blocksBySlotMu.Unlock()
530+
return nil
531+
})
532+
}
533+
534+
err = g.Wait()
535+
if err != nil {
536+
return err
537+
}
538+
539+
blocksArr := []*Block{}
540+
for _, block := range blocksBySlot {
541+
blocksArr = append(blocksArr, block)
542+
}
543+
sort.Slice(blocksArr, func(i, j int) bool { return blocksArr[i].Slot < blocksArr[j].Slot })
544+
545+
for _, block := range blocksArr {
546+
if block.ClBlockRootReponse == nil {
547+
if opts.DryRun {
548+
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "dbBlocks": len(block.DbBlocks)}).Warnf("wrong blocks.status (not updateing because dry)")
549+
} else {
550+
for _, dbBlock := range block.DbBlocks {
551+
_, err = db.WriterDb.Exec(`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2`, dbBlock.Slot, dbBlock.BlockRoot)
552+
if err != nil {
553+
return fmt.Errorf("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w", block.Slot, dbBlock.BlockRoot, err)
554+
}
555+
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "blockroot": fmt.Sprintf("%#x", dbBlock.BlockRoot)}).Warnf("updated blocks.status to 3, because no block from cl")
556+
}
557+
}
558+
}
559+
560+
if len(block.DbBlocks) == 0 {
561+
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot)}).Warnf("missing block in db")
562+
}
563+
564+
if len(block.DbBlocks) > 0 && block.ClBlockRootReponse != nil {
565+
for _, dbBlock := range block.DbBlocks {
566+
if block.ClBlockRootReponse.Data.Root != fmt.Sprintf("%#x", dbBlock.BlockRoot) {
567+
if opts.DryRun {
568+
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot)}).Warnf("wrong blocks.status (not updateing because dry), missmatching blockroot at slot: %v: %s != %#x", block.Slot, block.ClBlockRootReponse.Data.Root, dbBlock.BlockRoot)
569+
} else {
570+
_, err = db.WriterDb.Exec(`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2`, dbBlock.Slot, dbBlock.BlockRoot)
571+
if err != nil {
572+
return fmt.Errorf("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w", block.Slot, dbBlock.BlockRoot, err)
573+
}
574+
logrus.WithFields(logrus.Fields{"slot": block.Slot, "epoch": utils.EpochOfSlot(block.Slot), "blockroot": fmt.Sprintf("%#x", dbBlock.BlockRoot)}).Warnf("updated blocks.status to 3, because missmatching blockroot at slot: %v: %s != %#x", block.Slot, block.ClBlockRootReponse.Data.Root, dbBlock.BlockRoot)
575+
}
576+
}
577+
}
578+
}
579+
}
580+
581+
return nil
582+
}
583+
376584
func fixExecTransactionsCount() error {
377585
startBlockNumber := uint64(opts.StartBlock)
378586
endBlockNumber := uint64(opts.EndBlock)

0 commit comments

Comments
 (0)