4
4
"bytes"
5
5
"context"
6
6
"encoding/json"
7
+ "errors"
7
8
"eth2-exporter/db"
8
9
"eth2-exporter/exporter"
9
10
"eth2-exporter/rpc"
@@ -15,8 +16,10 @@ import (
15
16
"math"
16
17
"math/big"
17
18
"net/http"
19
+ "sort"
18
20
"strconv"
19
21
"strings"
22
+ "sync"
20
23
"time"
21
24
22
25
"github.com/coocood/freecache"
@@ -53,7 +56,7 @@ var opts = struct {
53
56
54
57
func main () {
55
58
configPath := flag .String ("config" , "config/default.config.yml" , "Path to the config file" )
56
- flag .StringVar (& opts .Command , "command" , "" , "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals" )
59
+ flag .StringVar (& opts .Command , "command" , "" , "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, fix-exec-transactions-count, fix-blocks " )
57
60
flag .Uint64Var (& opts .StartEpoch , "start-epoch" , 0 , "start epoch" )
58
61
flag .Uint64Var (& opts .EndEpoch , "end-epoch" , 0 , "end epoch" )
59
62
flag .Uint64Var (& opts .User , "user" , 0 , "user id" )
@@ -71,7 +74,7 @@ func main() {
71
74
flag .StringVar (& opts .Transformers , "transformers" , "" , "Comma separated list of transformers used by the eth1 indexer" )
72
75
flag .StringVar (& opts .ValidatorNameRanges , "validator-name-ranges" , "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges" , "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})" )
73
76
flag .StringVar (& opts .Columns , "columns" , "" , "Comma separated list of columns that should be affected by the command" )
74
- dryRun := flag .String ( "dry-run" , " true" , "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them" )
77
+ flag .BoolVar ( & opts . DryRun , "dry-run" , true , "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them" )
75
78
versionFlag := flag .Bool ("version" , false , "Show version and exit" )
76
79
flag .Parse ()
77
80
@@ -81,8 +84,6 @@ func main() {
81
84
return
82
85
}
83
86
84
- opts .DryRun = * dryRun != "false"
85
-
86
87
logrus .WithField ("config" , * configPath ).WithField ("version" , version .Version ).Printf ("starting" )
87
88
cfg := & types.Config {}
88
89
err := utils .ReadConfig (cfg , * configPath )
@@ -362,6 +363,8 @@ func main() {
362
363
exportStatsTotals (opts .Columns , opts .StartDay , opts .EndDay , opts .DataConcurrency )
363
364
case "fix-exec-transactions-count" :
364
365
err = fixExecTransactionsCount ()
366
+ case "fix-blocks" :
367
+ err = fixBlocks ()
365
368
default :
366
369
utils .LogFatal (nil , fmt .Sprintf ("unknown command %s" , opts .Command ), 0 )
367
370
}
@@ -373,6 +376,201 @@ func main() {
373
376
}
374
377
}
375
378
379
+ func fixBlocks () error {
380
+ logrus .WithFields (logrus.Fields {"startBlock" : opts .StartBlock , "endBlock" : opts .EndBlock , "dry" : opts .DryRun }).Infof ("running command fixBlocks" )
381
+
382
+ type ClEthV1ConfigSpecResponse struct {
383
+ Data map [string ]string `json:"data"`
384
+ }
385
+
386
+ type ClEthV2BeaconBlocksBlockResponse struct {
387
+ Version string `json:"version"`
388
+ ExecutionOptimistic bool `json:"execution_optimistic"`
389
+ Finalized bool `json:"finalized"`
390
+ Data struct {
391
+ Message struct {
392
+ Slot string `json:"slot"`
393
+ }
394
+ }
395
+ }
396
+
397
+ spec := & ClEthV1ConfigSpecResponse {}
398
+ err := utils .HttpReq (context .Background (), http .MethodGet , fmt .Sprintf ("http://%s:%s/eth/v1/config/spec" , utils .Config .Indexer .Node .Host , utils .Config .Indexer .Node .Port ), nil , spec )
399
+ if err != nil {
400
+ logrus .Fatal (err )
401
+ }
402
+ nodeDepositNetworkId , ok := spec .Data ["DEPOSIT_NETWORK_ID" ]
403
+ if ! ok {
404
+ logrus .Fatal (fmt .Errorf ("missing DEPOSIT_NETWORK_ID in spec from node" ))
405
+ }
406
+ if fmt .Sprintf ("%d" , utils .Config .Chain .ClConfig .DepositNetworkID ) != nodeDepositNetworkId {
407
+ logrus .Fatal (fmt .Errorf ("config.DepositNetworkId != node.DepositNetworkId: %v != %v" , utils .Config .Chain .ClConfig .DepositNetworkID , nodeDepositNetworkId ))
408
+ }
409
+
410
+ start := opts .StartBlock
411
+ end := opts .EndBlock
412
+ if end == 0 {
413
+ end = math .MaxUint64
414
+ }
415
+
416
+ for i := start ; i < end ; i += 1000 {
417
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
418
+ defer cancel ()
419
+ res := & ClEthV2BeaconBlocksBlockResponse {}
420
+ err := utils .HttpReq (ctx , http .MethodGet , fmt .Sprintf ("http://%s:%s/eth/v1/beacon/blocks/finalized" , utils .Config .Indexer .Node .Host , utils .Config .Indexer .Node .Port ), nil , res )
421
+ if err != nil {
422
+ return fmt .Errorf ("error getting finalized block: %w" , err )
423
+ }
424
+
425
+ finalizedSlot , err := strconv .ParseUint (res .Data .Message .Slot , 10 , 64 )
426
+
427
+ iStart := i
428
+ iEnd := i + 1000
429
+ if iEnd > finalizedSlot + 1 {
430
+ iEnd = finalizedSlot + 1
431
+ }
432
+
433
+ err = fixBlocksInRange (int (iStart ), int (iEnd ))
434
+ if err != nil {
435
+ return err
436
+ }
437
+
438
+ if iEnd > finalizedSlot {
439
+ return nil
440
+ }
441
+ }
442
+ return nil
443
+ }
444
+
445
+ func fixBlocksInRange (start , end int ) error {
446
+ type ClEthV2BeaconBlocksBlockRootResponse struct {
447
+ ExecutionOptimistic bool `json:"execution_optimistic"`
448
+ Finalized bool `json:"finalized"`
449
+ Data struct {
450
+ Root string `json:"root"`
451
+ } `json:"data"`
452
+ }
453
+
454
+ logrus .Infof ("checking blocks [%v-%v)" , start , end )
455
+
456
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 300 )
457
+ defer cancel ()
458
+
459
+ var err error
460
+ type DbBlock struct {
461
+ Slot uint64 `db:"slot"`
462
+ ExecBlockNumber uint64 `db:"exec_block_number"`
463
+ BlockRoot []byte `db:"blockroot"`
464
+ }
465
+ type Block struct {
466
+ Slot uint64
467
+ DbBlocks []* DbBlock
468
+ ClBlockRootReponse * ClEthV2BeaconBlocksBlockRootResponse
469
+ }
470
+
471
+ dbBlocks := []* DbBlock {}
472
+ err = db .WriterDb .Select (& dbBlocks , `SELECT slot, coalesce(exec_block_number,0) exec_block_number, blockroot FROM blocks WHERE status = '1' AND slot >= $1 AND slot < $2` , start , end )
473
+ if err != nil {
474
+ return fmt .Errorf ("error getting dbBlocks: %w" , err )
475
+ }
476
+
477
+ blocksBySlotMu := sync.Mutex {}
478
+ blocksBySlot := map [uint64 ]* Block {}
479
+
480
+ for _ , dbBlock := range dbBlocks {
481
+ block , exists := blocksBySlot [dbBlock .Slot ]
482
+ if ! exists {
483
+ block = & Block {Slot : dbBlock .Slot }
484
+ blocksBySlot [dbBlock .Slot ] = block
485
+ }
486
+ block .DbBlocks = append (block .DbBlocks , dbBlock )
487
+ }
488
+
489
+ g , gCtx := errgroup .WithContext (ctx )
490
+ g .SetLimit (4 )
491
+
492
+ for slot := start ; slot < end ; slot ++ {
493
+ slot := slot
494
+ g .Go (func () error {
495
+ select {
496
+ case <- gCtx .Done ():
497
+ return gCtx .Err ()
498
+ default :
499
+ }
500
+ res := & ClEthV2BeaconBlocksBlockRootResponse {}
501
+ ctx , cancel := context .WithTimeout (gCtx , time .Second * 10 )
502
+ defer cancel ()
503
+ err = utils .HttpReq (ctx , http .MethodGet , fmt .Sprintf ("http://%s:%s/eth/v1/beacon/blocks/%d/root" , utils .Config .Indexer .Node .Host , utils .Config .Indexer .Node .Port , slot ), nil , res )
504
+ if err != nil {
505
+ var httpErr * utils.HttpReqHttpError
506
+ if errors .As (err , & httpErr ) && httpErr .StatusCode == 404 {
507
+ // no sidecar for this slot
508
+ return nil
509
+ }
510
+ return err
511
+ }
512
+ blocksBySlotMu .Lock ()
513
+ block , exists := blocksBySlot [uint64 (slot )]
514
+ if ! exists {
515
+ block = & Block {Slot : uint64 (slot )}
516
+ blocksBySlot [uint64 (slot )] = block
517
+ }
518
+ block .ClBlockRootReponse = res
519
+ blocksBySlotMu .Unlock ()
520
+ return nil
521
+ })
522
+ }
523
+
524
+ err = g .Wait ()
525
+ if err != nil {
526
+ return err
527
+ }
528
+
529
+ blocksArr := []* Block {}
530
+ for _ , block := range blocksBySlot {
531
+ blocksArr = append (blocksArr , block )
532
+ }
533
+ sort .Slice (blocksArr , func (i , j int ) bool { return blocksArr [i ].Slot < blocksArr [j ].Slot })
534
+
535
+ for _ , block := range blocksArr {
536
+ if block .ClBlockRootReponse == nil {
537
+ if opts .DryRun {
538
+ logrus .WithFields (logrus.Fields {"slot" : block .Slot , "epoch" : utils .EpochOfSlot (block .Slot ), "dbBlocks" : len (block .DbBlocks )}).Warnf ("wrong blocks.status (not updateing because dry)" )
539
+ } else {
540
+ for _ , dbBlock := range block .DbBlocks {
541
+ _ , err = db .WriterDb .Exec (`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2` , dbBlock .Slot , dbBlock .BlockRoot )
542
+ if err != nil {
543
+ return fmt .Errorf ("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w" , block .Slot , dbBlock .BlockRoot , err )
544
+ }
545
+ logrus .WithFields (logrus.Fields {"slot" : block .Slot , "epoch" : utils .EpochOfSlot (block .Slot ), "blockroot" : fmt .Sprintf ("%#x" , dbBlock .BlockRoot )}).Warnf ("updated blocks.status to 3, because no block from cl" )
546
+ }
547
+ }
548
+ }
549
+
550
+ if len (block .DbBlocks ) == 0 {
551
+ logrus .WithFields (logrus.Fields {"slot" : block .Slot , "epoch" : utils .EpochOfSlot (block .Slot )}).Warnf ("missing block in db" )
552
+ }
553
+
554
+ if len (block .DbBlocks ) > 0 && block .ClBlockRootReponse != nil {
555
+ for _ , dbBlock := range block .DbBlocks {
556
+ if block .ClBlockRootReponse .Data .Root != fmt .Sprintf ("%#x" , dbBlock .BlockRoot ) {
557
+ if opts .DryRun {
558
+ logrus .WithFields (logrus.Fields {"slot" : block .Slot , "epoch" : utils .EpochOfSlot (block .Slot )}).Warnf ("wrong blocks.status (not updateing because dry), missmatching blockroot at slot: %v: %s != %#x" , block .Slot , block .ClBlockRootReponse .Data .Root , dbBlock .BlockRoot )
559
+ } else {
560
+ _ , err = db .WriterDb .Exec (`UPDATE blocks SET status = '3' WHERE slot = $1 AND blockroot = $2` , dbBlock .Slot , dbBlock .BlockRoot )
561
+ if err != nil {
562
+ return fmt .Errorf ("error updating blocks.status to 3 for block at slot %v with blockroot %#x: %w" , block .Slot , dbBlock .BlockRoot , err )
563
+ }
564
+ logrus .WithFields (logrus.Fields {"slot" : block .Slot , "epoch" : utils .EpochOfSlot (block .Slot ), "blockroot" : fmt .Sprintf ("%#x" , dbBlock .BlockRoot )}).Warnf ("updated blocks.status to 3, because missmatching blockroot at slot: %v: %s != %#x" , block .Slot , block .ClBlockRootReponse .Data .Root , dbBlock .BlockRoot )
565
+ }
566
+ }
567
+ }
568
+ }
569
+ }
570
+
571
+ return nil
572
+ }
573
+
376
574
func fixExecTransactionsCount () error {
377
575
startBlockNumber := uint64 (opts .StartBlock )
378
576
endBlockNumber := uint64 (opts .EndBlock )
0 commit comments