From 537b641861d22691e3b8073c5e9e7f2bdd3f0b28 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 25 Oct 2024 19:08:17 -0400 Subject: [PATCH] feat: use new chainsyncer keyper: add contract-addresses to config keyper-core: implement DB chaincache gnosis-keyper: use new chainsyncer gnosis-keyper: remove old sync-handler gnosisaccessnode: use new chainsyncer gnosisaccessnode: move storage to package gnosis-keyper: Rename some functions and types gnosis-keyper: refactor keyperCore init func gnosis-keyper: Pass dbpool to keyper core keyper: implement SyncStartBlock / DatabaseChainCache initial syncing gnosis-keyper: Add int64 overflow checks gnosis-keyper: add new synchandler chore: avoid var shadow feat(chainsync): add retry in initial block fetch chore: rename keypersetmanager synchandler chore: re-use the gnosis eth-client in keyper-core chore: refine comments chore: remove unnecessary import chore: fix generate, gci, gofumpt --- .../db/keyper/keyper.sqlc.gen.go | 137 +++++++++ .../db/keyper/sql/queries/keyper.sql | 40 ++- .../eonkeypublisher/eonkeypublisher.go | 2 + .../gnosisaccessnode/decryptionkeyshandler.go | 7 +- rolling-shutter/gnosisaccessnode/node.go | 88 +++--- .../gnosisaccessnode/{ => storage}/storage.go | 16 +- .../keyper/chaincache/chaincache.go | 143 ++++++++++ .../keyper/epochkghandler/service.go | 4 +- rolling-shutter/keyper/keyper.go | 76 ++++- rolling-shutter/keyper/kprconfig/config.go | 5 + rolling-shutter/keyper/options.go | 62 +++- .../gnosis/database/gnosiskeyper.sqlc.gen.go | 33 --- .../database/sql/queries/gnosiskeyper.sql | 7 - rolling-shutter/keyperimpl/gnosis/keyper.go | 257 +++++++---------- rolling-shutter/keyperimpl/gnosis/newblock.go | 26 -- .../keyperimpl/gnosis/neweonpublickey.go | 12 - .../keyperimpl/gnosis/newkeyperset.go | 55 ---- rolling-shutter/keyperimpl/gnosis/newslot.go | 56 ++-- .../keyperimpl/gnosis/sequencersyncer.go | 269 ------------------ 19 files changed, 641 insertions(+), 654 deletions(-) rename rolling-shutter/gnosisaccessnode/{ => storage}/storage.go (63%) create mode 100644 rolling-shutter/keyper/chaincache/chaincache.go delete mode 100644 rolling-shutter/keyperimpl/gnosis/newblock.go delete mode 100644 rolling-shutter/keyperimpl/gnosis/neweonpublickey.go delete mode 100644 rolling-shutter/keyperimpl/gnosis/newkeyperset.go delete mode 100644 rolling-shutter/keyperimpl/gnosis/sequencersyncer.go diff --git a/rolling-shutter/chainobserver/db/keyper/keyper.sqlc.gen.go b/rolling-shutter/chainobserver/db/keyper/keyper.sqlc.gen.go index 88b93bc14..dbe1ab759 100644 --- a/rolling-shutter/chainobserver/db/keyper/keyper.sqlc.gen.go +++ b/rolling-shutter/chainobserver/db/keyper/keyper.sqlc.gen.go @@ -9,6 +9,26 @@ import ( "context" ) +const deleteSyncedBlockByHash = `-- name: DeleteSyncedBlockByHash :exec +DELETE FROM recent_block +WHERE block_hash = $1 +` + +func (q *Queries) DeleteSyncedBlockByHash(ctx context.Context, blockHash []byte) error { + _, err := q.db.Exec(ctx, deleteSyncedBlockByHash, blockHash) + return err +} + +const evictSyncedBlocksBefore = `-- name: EvictSyncedBlocksBefore :exec +DELETE FROM recent_block +WHERE block_number < $1 +` + +func (q *Queries) EvictSyncedBlocksBefore(ctx context.Context, blockNumber int64) error { + _, err := q.db.Exec(ctx, evictSyncedBlocksBefore, blockNumber) + return err +} + const getKeyperSet = `-- name: GetKeyperSet :one SELECT keyper_config_index, activation_block_number, keypers, threshold FROM keyper_set WHERE activation_block_number <= $1 @@ -73,6 +93,87 @@ func (q *Queries) GetKeyperSets(ctx context.Context) ([]KeyperSet, error) { return items, nil } +const getLatestSyncedBlocks = `-- name: GetLatestSyncedBlocks :many +SELECT block_hash, block_number, parent_hash, timestamp, header FROM recent_block +ORDER BY block_number DESC +LIMIT $1 +` + +func (q *Queries) GetLatestSyncedBlocks(ctx context.Context, limit int32) ([]RecentBlock, error) { + rows, err := q.db.Query(ctx, getLatestSyncedBlocks, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []RecentBlock + for rows.Next() { + var i RecentBlock + if err := rows.Scan( + &i.BlockHash, + &i.BlockNumber, + &i.ParentHash, + &i.Timestamp, + &i.Header, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getSyncedBlockByHash = `-- name: GetSyncedBlockByHash :one +SELECT block_hash, block_number, parent_hash, timestamp, header FROM recent_block +WHERE block_hash = $1 +` + +func (q *Queries) GetSyncedBlockByHash(ctx context.Context, blockHash []byte) (RecentBlock, error) { + row := q.db.QueryRow(ctx, getSyncedBlockByHash, blockHash) + var i RecentBlock + err := row.Scan( + &i.BlockHash, + &i.BlockNumber, + &i.ParentHash, + &i.Timestamp, + &i.Header, + ) + return i, err +} + +const getSyncedBlocks = `-- name: GetSyncedBlocks :many +SELECT block_hash, block_number, parent_hash, timestamp, header FROM recent_block +ORDER BY block_number DESC +` + +func (q *Queries) GetSyncedBlocks(ctx context.Context) ([]RecentBlock, error) { + rows, err := q.db.Query(ctx, getSyncedBlocks) + if err != nil { + return nil, err + } + defer rows.Close() + var items []RecentBlock + for rows.Next() { + var i RecentBlock + if err := rows.Scan( + &i.BlockHash, + &i.BlockNumber, + &i.ParentHash, + &i.Timestamp, + &i.Header, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertKeyperSet = `-- name: InsertKeyperSet :exec INSERT INTO keyper_set ( keyper_config_index, @@ -100,3 +201,39 @@ func (q *Queries) InsertKeyperSet(ctx context.Context, arg InsertKeyperSetParams ) return err } + +const insertSyncedBlock = `-- name: InsertSyncedBlock :exec +INSERT INTO recent_block ( + block_hash, + block_number, + parent_hash, + timestamp, + header +) VALUES ( + $1, $2, $3, $4, $5 +) ON CONFLICT DO UPDATE SET + block_hash = $1, + block_number =$2 , + parent_hash =$3, + timestamp =$4 , + header =$5 +` + +type InsertSyncedBlockParams struct { + BlockHash []byte + BlockNumber int64 + ParentHash []byte + Timestamp int64 + Header []byte +} + +func (q *Queries) InsertSyncedBlock(ctx context.Context, arg InsertSyncedBlockParams) error { + _, err := q.db.Exec(ctx, insertSyncedBlock, + arg.BlockHash, + arg.BlockNumber, + arg.ParentHash, + arg.Timestamp, + arg.Header, + ) + return err +} diff --git a/rolling-shutter/chainobserver/db/keyper/sql/queries/keyper.sql b/rolling-shutter/chainobserver/db/keyper/sql/queries/keyper.sql index 90b386425..c95c61137 100644 --- a/rolling-shutter/chainobserver/db/keyper/sql/queries/keyper.sql +++ b/rolling-shutter/chainobserver/db/keyper/sql/queries/keyper.sql @@ -18,4 +18,42 @@ ORDER BY activation_block_number DESC LIMIT 1; -- name: GetKeyperSets :many SELECT * FROM keyper_set -ORDER BY activation_block_number ASC; \ No newline at end of file +ORDER BY activation_block_number ASC; + +-- name: InsertSyncedBlock :exec +INSERT INTO recent_block ( + block_hash, + block_number, + parent_hash, + timestamp, + header +) VALUES ( + $1, $2, $3, $4, $5 +) ON CONFLICT DO UPDATE SET + block_hash = $1, + block_number =$2 , + parent_hash =$3, + timestamp =$4 , + header =$5 +; + +-- name: GetSyncedBlockByHash :one +SELECT * FROM recent_block +WHERE block_hash = $1; + +-- name: DeleteSyncedBlockByHash :exec +DELETE FROM recent_block +WHERE block_hash = $1; + +-- name: GetSyncedBlocks :many +SELECT * FROM recent_block +ORDER BY block_number DESC; + +-- name: GetLatestSyncedBlocks :many +SELECT * FROM recent_block +ORDER BY block_number DESC +LIMIT $1; + +-- name: EvictSyncedBlocksBefore :exec +DELETE FROM recent_block +WHERE block_number < $1; diff --git a/rolling-shutter/eonkeypublisher/eonkeypublisher.go b/rolling-shutter/eonkeypublisher/eonkeypublisher.go index e6b88d4b7..b73982bc5 100644 --- a/rolling-shutter/eonkeypublisher/eonkeypublisher.go +++ b/rolling-shutter/eonkeypublisher/eonkeypublisher.go @@ -29,6 +29,8 @@ const ( ) // EonKeyPublisher is a service that publishes eon keys via a eon key publisher contract. +// E.g. in the Gnosis keyper implementation this is used to publish keys onchain +// instead of broadcasting it on the p2p-network. type EonKeyPublisher struct { dbpool *pgxpool.Pool client *ethclient.Client diff --git a/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go b/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go index 230ddecf6..4f3aed677 100644 --- a/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go +++ b/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go @@ -10,19 +10,20 @@ import ( "github.com/shutter-network/shutter/shlib/shcrypto" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2pmsg" ) type DecryptionKeysHandler struct { config *Config - storage *Storage + storage *storage.Memory } -func NewDecryptionKeysHandler(config *Config, storage *Storage) *DecryptionKeysHandler { +func NewDecryptionKeysHandler(config *Config, store *storage.Memory) *DecryptionKeysHandler { return &DecryptionKeysHandler{ config: config, - storage: storage, + storage: store, } } diff --git a/rolling-shutter/gnosisaccessnode/node.go b/rolling-shutter/gnosisaccessnode/node.go index 277a3dfb3..f179e4f3f 100644 --- a/rolling-shutter/gnosisaccessnode/node.go +++ b/rolling-shutter/gnosisaccessnode/node.go @@ -2,30 +2,28 @@ package gnosisaccessnode import ( "context" + "fmt" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" - "github.com/rs/zerolog/log" - "github.com/shutter-network/shutter/shlib/shcrypto" - - obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" - chainsync "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync" - syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync/event" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/synchandler" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" - "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" ) type GnosisAccessNode struct { config *Config - storage *Storage + storage *storage.Memory } func New(config *Config) *GnosisAccessNode { return &GnosisAccessNode{ config: config, - storage: NewStorage(), + storage: storage.NewMemory(), } } @@ -39,19 +37,35 @@ func (node *GnosisAccessNode) Start(ctx context.Context, runner service.Runner) messageSender.AddMessageHandler(NewDecryptionKeysHandler(node.config, node.storage)) services = append(services, messageSender) - chainSyncClient, err := chainsync.NewClient( - ctx, - chainsync.WithClientURL(node.config.GnosisNode.EthereumURL), - chainsync.WithKeyperSetManager(node.config.Contracts.KeyperSetManager), - chainsync.WithKeyBroadcastContract(node.config.Contracts.KeyBroadcastContract), - chainsync.WithSyncNewKeyperSet(node.onNewKeyperSet), - chainsync.WithSyncNewEonKey(node.onNewEonKey), + ethClient, err := ethclient.DialContext(ctx, node.config.GnosisNode.EthereumURL) + if err != nil { + return err + } + keyperSetAdded, err := synchandler.NewKeyperSetAdded( + ethClient, + node.storage, + node.config.Contracts.KeyperSetManager, ) if err != nil { - return errors.Wrap(err, "failed to initialize chain sync client") + return err } - services = append(services, chainSyncClient) - + eonKeyBroadcast, err := synchandler.NewEonKeyBroadcast( + node.storage, + node.config.Contracts.KeyBroadcastContract, + ) + if err != nil { + return err + } + chainsyncOpts := []chainsync.Option{ + chainsync.WithClient(ethClient), + chainsync.WithContractEventHandler(keyperSetAdded), + chainsync.WithContractEventHandler(eonKeyBroadcast), + } + chainsyncer, err := chainsync.New(chainsyncOpts...) + if err != nil { + return fmt.Errorf("can't instantiate chainsync: %w", err) + } + services = append(services, chainsyncer) if node.config.Metrics.Enabled { metricsServer := metricsserver.New(node.config.Metrics) services = append(services, metricsServer) @@ -59,39 +73,3 @@ func (node *GnosisAccessNode) Start(ctx context.Context, runner service.Runner) return runner.StartService(services...) } - -func (node *GnosisAccessNode) onNewKeyperSet(_ context.Context, keyperSet *syncevent.KeyperSet) error { - obsKeyperSet := obskeyperdatabase.KeyperSet{ - KeyperConfigIndex: int64(keyperSet.Eon), - ActivationBlockNumber: int64(keyperSet.ActivationBlock), - Keypers: shdb.EncodeAddresses(keyperSet.Members), - Threshold: int32(keyperSet.Threshold), - } - log.Info(). - Uint64("keyper-config-index", keyperSet.Eon). - Uint64("activation-block-number", keyperSet.ActivationBlock). - Int("num-keypers", len(keyperSet.Members)). - Uint64("threshold", keyperSet.Threshold). - Msg("adding keyper set") - node.storage.AddKeyperSet(keyperSet.Eon, &obsKeyperSet) - return nil -} - -func (node *GnosisAccessNode) onNewEonKey(_ context.Context, eonKey *syncevent.EonPublicKey) error { - key := new(shcrypto.EonPublicKey) - err := key.Unmarshal(eonKey.Key) - if err != nil { - log.Error(). - Err(err). - Hex("key", eonKey.Key). - Int("keyper-config-index", int(eonKey.Eon)). - Msg("received invalid eon key") - return nil - } - log.Info(). - Int("keyper-config-index", int(eonKey.Eon)). - Hex("key", eonKey.Key). - Msg("adding eon key") - node.storage.AddEonKey(eonKey.Eon, key) - return nil -} diff --git a/rolling-shutter/gnosisaccessnode/storage.go b/rolling-shutter/gnosisaccessnode/storage/storage.go similarity index 63% rename from rolling-shutter/gnosisaccessnode/storage.go rename to rolling-shutter/gnosisaccessnode/storage/storage.go index c03ecf808..986f08c52 100644 --- a/rolling-shutter/gnosisaccessnode/storage.go +++ b/rolling-shutter/gnosisaccessnode/storage/storage.go @@ -1,4 +1,4 @@ -package gnosisaccessnode +package storage import ( "sync" @@ -8,28 +8,28 @@ import ( obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" ) -type Storage struct { +type Memory struct { mu sync.Mutex eonKeys map[uint64]*shcrypto.EonPublicKey keyperSets map[uint64]*obskeyperdatabase.KeyperSet } -func NewStorage() *Storage { - return &Storage{ +func NewMemory() *Memory { + return &Memory{ mu: sync.Mutex{}, eonKeys: make(map[uint64]*shcrypto.EonPublicKey), keyperSets: make(map[uint64]*obskeyperdatabase.KeyperSet), } } -func (s *Storage) AddEonKey(keyperConfigIndex uint64, key *shcrypto.EonPublicKey) { +func (s *Memory) AddEonKey(keyperConfigIndex uint64, key *shcrypto.EonPublicKey) { s.mu.Lock() defer s.mu.Unlock() s.eonKeys[keyperConfigIndex] = key } -func (s *Storage) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, bool) { +func (s *Memory) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -37,14 +37,14 @@ func (s *Storage) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, b return v, ok } -func (s *Storage) AddKeyperSet(keyperConfigIndex uint64, keyperSet *obskeyperdatabase.KeyperSet) { +func (s *Memory) AddKeyperSet(keyperConfigIndex uint64, keyperSet *obskeyperdatabase.KeyperSet) { s.mu.Lock() defer s.mu.Unlock() s.keyperSets[keyperConfigIndex] = keyperSet } -func (s *Storage) GetKeyperSet(keyperConfigIndex uint64) (*obskeyperdatabase.KeyperSet, bool) { +func (s *Memory) GetKeyperSet(keyperConfigIndex uint64) (*obskeyperdatabase.KeyperSet, bool) { s.mu.Lock() defer s.mu.Unlock() diff --git a/rolling-shutter/keyper/chaincache/chaincache.go b/rolling-shutter/keyper/chaincache/chaincache.go new file mode 100644 index 000000000..a7f881fe8 --- /dev/null +++ b/rolling-shutter/keyper/chaincache/chaincache.go @@ -0,0 +1,143 @@ +package chaincache + +import ( + "context" + "errors" + "fmt" + "math" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + + database "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/chainsegment" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" +) + +const MaxSyncedBlockCacheSize = 100 + +var _ syncer.ChainCache = &DatabaseChainCache{} + +func NewDatabaseChainCache(db *pgxpool.Pool) *DatabaseChainCache { + return &DatabaseChainCache{ + dbpool: db, + } +} + +type DatabaseChainCache struct { + dbpool *pgxpool.Pool +} + +func EncodeHeader(h *types.Header) ([]byte, error) { + return rlp.EncodeToBytes(h) +} + +func DecodeHeader(b []byte) (*types.Header, error) { + h := new(types.Header) + err := rlp.DecodeBytes(b, h) + if err != nil { + return nil, err + } + return h, nil +} + +func (f *DatabaseChainCache) Get(ctx context.Context) (*chainsegment.ChainSegment, error) { + headers := []*types.Header{} + err := f.dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { + q := database.New(tx) + blocks, err := q.GetSyncedBlocks(ctx) + if err != nil { + return err + } + // TODO: sanity check hashes / parent hashes + for _, block := range blocks { + h, err := DecodeHeader(block.Header) + if err != nil { + return fmt.Errorf("error decoding header: %w", err) + } + headers = append(headers, h) + } + return nil + }) + if err != nil { + return nil, err + } + if len(headers) == 0 { + return nil, syncer.ErrEmpy + } + return chainsegment.NewChainSegment(headers...), nil +} + +func (f *DatabaseChainCache) GetHeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) { + var header *types.Header + err := f.dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { + q := database.New(tx) + block, err := q.GetSyncedBlockByHash(ctx, blockHash.Bytes()) + if err != nil { + return err + } + header, err = DecodeHeader(block.Header) + return err + }) + if err != nil && err != pgx.ErrNoRows { + return nil, fmt.Errorf("failed to query block header from database cache: %w", err) + } + if err == pgx.ErrNoRows { + // we don't have the header cached + err = nil + } + return header, err +} + +func (f *DatabaseChainCache) Update(ctx context.Context, update syncer.ChainUpdateContext) error { + return f.dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { + q := database.New(tx) + for _, header := range update.Remove.Get() { + // Not strictly necessary if the QueryContext.Delete + // have the corresponding reorged blocks in QueryContext.Update + err := q.DeleteSyncedBlockByHash(ctx, header.Hash().Bytes()) + if err != nil { + return err + } + } + updateHeader := update.Append.Get() + for _, header := range updateHeader { + h, err := EncodeHeader(header) + if err != nil { + return err + } + if !header.Number.IsInt64() { + return fmt.Errorf("block number int64 overflow") + } + if header.Time > math.MaxInt64 { + return errors.New("block time int64 overflow") + } + if err := q.InsertSyncedBlock(ctx, + database.InsertSyncedBlockParams{ + BlockHash: header.Hash().Bytes(), + ParentHash: header.ParentHash.Bytes(), + BlockNumber: header.Number.Int64(), + Timestamp: int64(header.Time), + Header: h, + }); err != nil { + return err + } + } + + if len(updateHeader) != 0 { + earliestBlockNum := updateHeader[0].Number + if !earliestBlockNum.IsInt64() { + return fmt.Errorf("earliest block number int64 overflow") + } + evictBefore := earliestBlockNum.Int64() - MaxSyncedBlockCacheSize + err := q.EvictSyncedBlocksBefore(ctx, evictBefore) + if err != nil { + return err + } + } + return nil + }) +} diff --git a/rolling-shutter/keyper/epochkghandler/service.go b/rolling-shutter/keyper/epochkghandler/service.go index 29a7fbaee..517de9e5c 100644 --- a/rolling-shutter/keyper/epochkghandler/service.go +++ b/rolling-shutter/keyper/epochkghandler/service.go @@ -27,7 +27,7 @@ type KeyShareHandler struct { Trigger <-chan *broker.Event[*DecryptionTrigger] } -func (ksh *KeyShareHandler) handleEvent(ctx context.Context, ev *broker.Event[*DecryptionTrigger]) { +func (ksh *KeyShareHandler) handleDecryptionTrigger(ctx context.Context, ev *broker.Event[*DecryptionTrigger]) { var err error defer func() { if err != nil { @@ -85,7 +85,7 @@ func (ksh *KeyShareHandler) Start(ctx context.Context, group service.Runner) err log.Debug().Msg("decryption trigger channel closed, stopping loop") return nil } - ksh.handleEvent(ctx, triggerEvent) + ksh.handleDecryptionTrigger(ctx, triggerEvent) case <-ctx.Done(): return ctx.Err() } diff --git a/rolling-shutter/keyper/keyper.go b/rolling-shutter/keyper/keyper.go index f417ae4c1..c16351976 100644 --- a/rolling-shutter/keyper/keyper.go +++ b/rolling-shutter/keyper/keyper.go @@ -3,8 +3,11 @@ package keyper import ( "context" "fmt" + "math/big" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" @@ -15,6 +18,7 @@ import ( obskeyper "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/contract/deployment" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/chaincache" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/fx" @@ -22,8 +26,12 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprapi" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/smobserver" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/synchandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/chainsegment" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/channel" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" @@ -44,6 +52,7 @@ type KeyperCore struct { messaging p2p.Messaging messageSender fx.RPCMessageSender blockSyncClient *ethclient.Client + chainsyncer *chainsync.Chainsync shuttermintState *smobserver.ShuttermintState metricsServer *metricsserver.MetricsServer @@ -122,7 +131,59 @@ func (kpr *KeyperCore) initOptions(ctx context.Context, runner service.Runner) e } else { kpr.blockSyncClient = kpr.opts.blockSyncClient } - return nil + + keyperSetAdded, err := synchandler.NewKeyperSetAdded( + kpr.dbpool, + kpr.blockSyncClient, + kpr.config.ContractAddresses.KeyperSetManager, + kpr.opts.ethereumAddress, + ) + if err != nil { + return err + } + + dbChainCache := chaincache.NewDatabaseChainCache(kpr.dbpool) + cachedChain, err := dbChainCache.Get(ctx) + if err != nil { + return err + } + + // The keyper hasn't been synced before. + // use the provided option to start syncing only from a specific block + if cachedChain == nil { + syncBlockNumber := kpr.opts.syncStartBlockNumber + if syncBlockNumber == nil { + syncBlockHeader, err := kpr.blockSyncClient.HeaderByNumber(ctx, nil) + if err != nil { + return fmt.Errorf("can't fetch latest head: %w", err) + } + syncBlockNumber = syncBlockHeader.Number + } + cachedBeforeSyncStart, err := kpr.blockSyncClient.HeaderByNumber(ctx, new(big.Int).Sub(syncBlockNumber, big.NewInt(1))) + if err != nil { + return fmt.Errorf("can't fetch header before the sync start: %w", err) + } + // pretend that we synced until one block before the sync start + err = dbChainCache.Update(ctx, syncer.ChainUpdateContext{ + Append: chainsegment.NewChainSegment(cachedBeforeSyncStart), + }) + if err != nil { + return fmt.Errorf("can't update database chain-cache with initial header: %w", err) + } + } + + chainsyncOpts := []chainsync.Option{ + chainsync.WithClient(kpr.blockSyncClient), + chainsync.WithContractEventHandler(keyperSetAdded), + } + for _, eh := range kpr.opts.eventHandler { + chainsyncOpts = append(chainsyncOpts, chainsync.WithContractEventHandler(eh)) + } + for _, ch := range kpr.opts.chainHandler { + chainsyncOpts = append(chainsyncOpts, chainsync.WithChainUpdateHandler(ch)) + } + kpr.chainsyncer, err = chainsync.New(chainsyncOpts...) + return err } func (kpr *KeyperCore) Start(ctx context.Context, runner service.Runner) error { @@ -169,12 +230,17 @@ func (kpr *KeyperCore) Start(ctx context.Context, runner service.Runner) error { func (kpr *KeyperCore) getServices() []service.Service { services := []service.Service{ + kpr.chainsyncer, kpr.messaging, + // TODO: put this in a chainHandler in the chainsyncer, + // but beware of historic chain updates service.Function{Func: kpr.operateShuttermint}, newEonPubKeyHandler(kpr), } keyTrigger := kpr.trigger if kpr.config.HTTPEnabled { + // allow manually triggering the decryption via the + // admin HTTP endpoint: httpServer := kprapi.NewHTTPService(kpr.dbpool, kpr.config, kpr.messaging) services = append(services, httpServer) // combine two sources of decryption triggers @@ -346,7 +412,9 @@ func (kpr *KeyperCore) handleOnChainKeyperSetChanges( } // TODO: we need a better block syncing mechanism! -// Also this is doing too much work sequentially in one routine. +// Now that we have the chainsync, we could put this in a ChainUpdateHandler. +// However we have to take care of what happens when the node restarts +// and is quickly syncing non-latest historic, but unknown blocks. func (kpr *KeyperCore) operateShuttermint(ctx context.Context, _ service.Runner) error { for { syncBlockNumber, err := retry.FunctionCall(ctx, kpr.blockSyncClient.BlockNumber) @@ -376,3 +444,7 @@ func (kpr *KeyperCore) operateShuttermint(ctx context.Context, _ service.Runner) } } } + +func (kpr *KeyperCore) GetHeaderByHash(ctx context.Context, h common.Hash) (*types.Header, error) { + return kpr.chainsyncer.GetHeaderByHash(ctx, h) +} diff --git a/rolling-shutter/keyper/kprconfig/config.go b/rolling-shutter/keyper/kprconfig/config.go index e2cf76f55..cbb7b3283 100644 --- a/rolling-shutter/keyper/kprconfig/config.go +++ b/rolling-shutter/keyper/kprconfig/config.go @@ -18,6 +18,10 @@ import ( var _ configuration.Config = &ShuttermintConfig{} +type ContractAddresses struct { + KeyperSetManager common.Address +} + type Config struct { InstanceID uint64 DatabaseURL string @@ -31,6 +35,7 @@ type Config struct { Metrics *metricsserver.MetricsConfig MaxNumKeysPerMessage uint64 + ContractAddresses ContractAddresses } func (c *Config) GetAddress() common.Address { diff --git a/rolling-shutter/keyper/options.go b/rolling-shutter/keyper/options.go index 3c1b05d36..7d7ea2385 100644 --- a/rolling-shutter/keyper/options.go +++ b/rolling-shutter/keyper/options.go @@ -3,43 +3,47 @@ package keyper import ( "context" "errors" - "reflect" + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/jackc/pgx/v4/pgxpool" - "github.com/shutter-network/rolling-shutter/rolling-shutter/contract" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" ) type Option func(*options) error type options struct { - dbpool *pgxpool.Pool - broadcastEonPubKey bool - messaging p2p.Messaging - blockSyncClient *ethclient.Client - messageHandler []p2p.MessageHandler - eonPubkeyHandler EonPublicKeyHandlerFunc + dbpool *pgxpool.Pool + broadcastEonPubKey bool + messaging p2p.Messaging + syncStartBlockNumber *big.Int + blockSyncClient *ethclient.Client + messageHandler []p2p.MessageHandler + eonPubkeyHandler EonPublicKeyHandlerFunc + ethereumAddress common.Address + chainHandler []syncer.ChainUpdateHandler + eventHandler []syncer.ContractEventHandler } func newDefaultOptions() *options { return &options{ - dbpool: nil, broadcastEonPubKey: true, - blockSyncClient: nil, messageHandler: []p2p.MessageHandler{}, - eonPubkeyHandler: nil, + chainHandler: []syncer.ChainUpdateHandler{}, + eventHandler: []syncer.ContractEventHandler{}, } } -var keyperNewConfigType = reflect.TypeOf(contract.KeypersConfigsListNewConfig{}) - func validateOptions(o *options) error { if !o.broadcastEonPubKey && o.eonPubkeyHandler == nil { return errors.New("no eon public key broadcast nor handler function provided. " + "newly negotiated eon public-keys would not be forwarded") } + // TODO: check for non-nil contract addresses + // TODO: ethereum address required return nil } @@ -106,3 +110,35 @@ func WithMessaging(sender p2p.Messaging) Option { return nil } } + +// TODO: docs. +func WithContractEventHandler(h syncer.ContractEventHandler) Option { + return func(o *options) error { + o.eventHandler = append(o.eventHandler, h) + return nil + } +} + +// TODO: docs. +func WithChainUpdateHandler(h syncer.ChainUpdateHandler) Option { + return func(o *options) error { + o.chainHandler = append(o.chainHandler, h) + return nil + } +} + +// TODO: docs. +func WithEthereumAddress(address common.Address) Option { + return func(o *options) error { + o.ethereumAddress = address + return nil + } +} + +// TODO: use this e.g. with e.g. the Gnosis keyper and the gnosis config value "SyncStartBlockNumber". +func WithSyncStartBlockNumber(num big.Int) Option { + return func(o *options) error { + o.syncStartBlockNumber = &num + return nil + } +} diff --git a/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go b/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go index 77d4a0840..682f45f60 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go +++ b/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go @@ -184,22 +184,6 @@ func (q *Queries) GetTransactionSubmittedEvents(ctx context.Context, arg GetTran return items, nil } -const getTransactionSubmittedEventsSyncedUntil = `-- name: GetTransactionSubmittedEventsSyncedUntil :one -SELECT enforce_one_row, block_hash, block_number, slot FROM transaction_submitted_events_synced_until LIMIT 1 -` - -func (q *Queries) GetTransactionSubmittedEventsSyncedUntil(ctx context.Context) (TransactionSubmittedEventsSyncedUntil, error) { - row := q.db.QueryRow(ctx, getTransactionSubmittedEventsSyncedUntil) - var i TransactionSubmittedEventsSyncedUntil - err := row.Scan( - &i.EnforceOneRow, - &i.BlockHash, - &i.BlockNumber, - &i.Slot, - ) - return i, err -} - const getTxPointer = `-- name: GetTxPointer :one SELECT eon, age, value FROM tx_pointer WHERE eon = $1 @@ -444,23 +428,6 @@ func (q *Queries) SetCurrentDecryptionTrigger(ctx context.Context, arg SetCurren return err } -const setTransactionSubmittedEventsSyncedUntil = `-- name: SetTransactionSubmittedEventsSyncedUntil :exec -INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number, slot) VALUES ($1, $2, $3) -ON CONFLICT (enforce_one_row) DO UPDATE -SET block_hash = $1, block_number = $2, slot = $3 -` - -type SetTransactionSubmittedEventsSyncedUntilParams struct { - BlockHash []byte - BlockNumber int64 - Slot int64 -} - -func (q *Queries) SetTransactionSubmittedEventsSyncedUntil(ctx context.Context, arg SetTransactionSubmittedEventsSyncedUntilParams) error { - _, err := q.db.Exec(ctx, setTransactionSubmittedEventsSyncedUntil, arg.BlockHash, arg.BlockNumber, arg.Slot) - return err -} - const setTxPointer = `-- name: SetTxPointer :exec INSERT INTO tx_pointer (eon, age, value) VALUES ($1, $2, $3) diff --git a/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql b/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql index e2ffa646e..945d06ce6 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql +++ b/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql @@ -26,13 +26,6 @@ WHERE eon = $1 AND index >= $2 AND index < $2 + $3 ORDER BY index ASC LIMIT $3; --- name: SetTransactionSubmittedEventsSyncedUntil :exec -INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number, slot) VALUES ($1, $2, $3) -ON CONFLICT (enforce_one_row) DO UPDATE -SET block_hash = $1, block_number = $2, slot = $3; - --- name: GetTransactionSubmittedEventsSyncedUntil :one -SELECT * FROM transaction_submitted_events_synced_until LIMIT 1; -- name: GetLatestTransactionSubmittedEvent :one SELECT * FROM transaction_submitted_event ORDER BY block_number DESC diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index 2d056097b..b29b0763e 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -2,16 +2,14 @@ package gnosis import ( "context" + "math/big" "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - gethLog "github.com/ethereum/go-ethereum/log" "github.com/jackc/pgx/v4/pgxpool" "github.com/pkg/errors" "github.com/rs/zerolog/log" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" - "golang.org/x/exp/slog" "github.com/shutter-network/rolling-shutter/rolling-shutter/eonkeypublisher" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" @@ -19,11 +17,13 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/synchandler" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" - chainsync "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync" - syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync/event" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/errs" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/slotticker" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" @@ -44,16 +44,11 @@ type Keyper struct { config *config.Config dbpool *pgxpool.Pool beaconAPIClient *beaconapiclient.Client + gnosisEthClient *ethclient.Client - chainSyncClient *chainsync.Client - sequencerSyncer *SequencerSyncer - validatorSyncer *ValidatorSyncer eonKeyPublisher *eonkeypublisher.EonKeyPublisher latestTriggeredSlot *uint64 - // input events - newBlocks chan *syncevent.LatestBlock - newKeyperSets chan *syncevent.KeyperSet newEonPublicKeys chan keyper.EonPublicKey slotTicker *slotticker.SlotTicker @@ -70,11 +65,8 @@ func New(c *config.Config) *Keyper { func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { var err error - kpr.newBlocks = make(chan *syncevent.LatestBlock) - kpr.newKeyperSets = make(chan *syncevent.KeyperSet) kpr.newEonPublicKeys = make(chan keyper.EonPublicKey) kpr.decryptionTriggerChannel = make(chan *broker.Event[*epochkghandler.DecryptionTrigger]) - kpr.latestTriggeredSlot = nil offset := -(time.Duration(kpr.config.Gnosis.SecondsPerSlot) * time.Second) * @@ -95,40 +87,25 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return errors.Wrap(err, "failed to initialize beacon API client") } - messageSender, err := p2p.New(kpr.config.P2P) + messaging, err := p2p.New(kpr.config.P2P) if err != nil { return errors.Wrap(err, "failed to initialize p2p messaging") } - messageSender.AddMessageHandler(&DecryptionKeySharesHandler{kpr.dbpool}) - messageSender.AddMessageHandler(&DecryptionKeysHandler{kpr.dbpool}) - messagingMiddleware := NewMessagingMiddleware(messageSender, kpr.dbpool, kpr.config) + messaging.AddMessageHandler(&DecryptionKeySharesHandler{kpr.dbpool}) + messaging.AddMessageHandler(&DecryptionKeysHandler{kpr.dbpool}) - kpr.core, err = NewKeyper(kpr, messagingMiddleware) + chainID, err := kpr.gnosisEthClient.ChainID(ctx) if err != nil { - return errors.Wrap(err, "can't instantiate keyper core") - } - - kpr.chainSyncClient, err = chainsync.NewClient( - ctx, - chainsync.WithClientURL(kpr.config.Gnosis.Node.EthereumURL), - chainsync.WithKeyperSetManager(kpr.config.Gnosis.Contracts.KeyperSetManager), - chainsync.WithKeyBroadcastContract(kpr.config.Gnosis.Contracts.KeyBroadcastContract), - chainsync.WithSyncNewBlock(kpr.channelNewBlock), - chainsync.WithSyncNewKeyperSet(kpr.channelNewKeyperSet), - chainsync.WithPrivateKey(kpr.config.Gnosis.Node.PrivateKey.Key), - chainsync.WithLogger(gethLog.NewLogger(slog.Default().Handler())), - ) - if err != nil { - return err + return errors.Wrap(err, "failed to get chain ID") } - eonKeyPublisherClient, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL) + kpr.gnosisEthClient, err = ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL) if err != nil { - return errors.Wrapf(err, "failed to dial ethereum node at %s", kpr.config.Gnosis.Node.EthereumURL) + return errors.Wrapf(err, "failed to dial gnosis node at %s", kpr.config.Gnosis.Node.EthereumURL) } kpr.eonKeyPublisher, err = eonkeypublisher.NewEonKeyPublisher( kpr.dbpool, - eonKeyPublisherClient, + kpr.gnosisEthClient, kpr.config.Gnosis.Contracts.KeyperSetManager, kpr.config.Gnosis.Node.PrivateKey.Key, ) @@ -136,14 +113,39 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return errors.Wrap(err, "failed to initialize eon key publisher") } - err = kpr.initSequencerSyncer(ctx) + validatorUpdated, err := synchandler.NewValidatorUpdated( + kpr.dbpool, + kpr.gnosisEthClient, + kpr.beaconAPIClient, + kpr.config.Gnosis.Contracts.ValidatorRegistry, + chainID.Uint64(), + ) if err != nil { return err } - err = kpr.initValidatorSyncer(ctx) + sequencerTxSubmitted, err := synchandler.NewSequencerTransactionSubmitted( + kpr.dbpool, + kpr.config.Gnosis.Contracts.Sequencer, + ) if err != nil { return err } + err = kpr.initKeyperCore( + messaging, + []syncer.ChainUpdateHandler{ + // trigger possible decryption on every new block header from Gnosis chain + synchandler.NewDecryptOnChainUpdate(kpr.maybeDecryptOnNewHeader), + }, + []syncer.ContractEventHandler{ + // process the SequencerTransactionSubmitted events from Gnosis chain + sequencerTxSubmitted, + // process the ValidatorUpdated events from Gnosis chain + validatorUpdated, + }, + ) + if err != nil { + return errors.Wrap(err, "can't instantiate keyper core") + } // Set all transaction pointer ages to infinity. They will be reset to zero when the next // decryption keys arrive, telling us the agreed upon pointer value. Pointer values that are @@ -154,13 +156,48 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { if err != nil { return errors.Wrap(err, "failed to reset transaction pointer age") } - runner.Go(func() error { return kpr.processInputs(ctx) }) - return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher) + return runner.StartService(kpr.core, kpr.slotTicker, kpr.eonKeyPublisher) } -func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) { - core, err := keyper.New( +func (kpr *Keyper) initKeyperCore( + messaging *p2p.P2PMessaging, + chainUpdateHandler []syncer.ChainUpdateHandler, + eventHandler []syncer.ContractEventHandler, +) error { + opts := []keyper.Option{ + // re-use the gnosis client to receive chain updates + keyper.WithBlockSyncClient(kpr.gnosisEthClient), + keyper.WithDBPool(kpr.dbpool), + // P2P messaging - we use a middleware to inject some special + // functionality + keyper.WithMessaging( + NewMessagingMiddleware(messaging, kpr.dbpool, kpr.config), + ), + + // don't broadcast generated eon public keys to the P2P network, + keyper.NoBroadcastEonPublicKey(), + // but instead push them on the internal channel, + // where it gets written to a contract onchain + keyper.WithEonPublicKeyHandler(kpr.sendNewEonPubkeyToChannel), + + // This will only start syncing blockchain events + // from a specific block on, and only when we never synced before. + // Otherwise, it will pick up syncing where we last stopped. + // Beware that this has to be a block at or before where the keyper-set was + // added (not started) onchain. + // Otherwise the node will miss the event and never know it is + // part of the keyper-set. + keyper.WithSyncStartBlockNumber(*new(big.Int).SetUint64(kpr.config.Gnosis.SyncStartBlockNumber)), + } + for _, h := range chainUpdateHandler { + opts = append(opts, keyper.WithChainUpdateHandler(h)) + } + for _, h := range eventHandler { + opts = append(opts, keyper.WithContractEventHandler(h)) + } + var err error + kpr.core, err = keyper.New( &kprconfig.Config{ InstanceID: kpr.config.InstanceID, DatabaseURL: kpr.config.DatabaseURL, @@ -171,129 +208,51 @@ func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.K Shuttermint: kpr.config.Shuttermint, Metrics: kpr.config.Metrics, MaxNumKeysPerMessage: kpr.config.MaxNumKeysPerMessage, + // The keyper core needs the implementation addresses of the core + // contracts on the specific chain. + ContractAddresses: kprconfig.ContractAddresses{ + KeyperSetManager: kpr.config.Gnosis.Contracts.KeyperSetManager, + }, }, + // send events to this channel to trigger decryption in the keyper core kpr.decryptionTriggerChannel, - keyper.WithDBPool(kpr.dbpool), - keyper.NoBroadcastEonPublicKey(), - keyper.WithEonPublicKeyHandler(kpr.channelNewEonPublicKey), - keyper.WithMessaging(messagingMiddleware), + opts..., ) - return core, err -} - -// initSequencerSycer initializes the sequencer syncer if the keyper is known to be a member of a -// keyper set. Otherwise, the syncer will only be initialized once such a keyper set is observed to -// be added, as only then we will know which eon(s) we are responsible for. -func (kpr *Keyper) initSequencerSyncer(ctx context.Context) error { - client, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL) - if err != nil { - return errors.Wrap(err, "failed to dial Ethereum execution node") - } - - log.Info(). - Str("contract-address", kpr.config.Gnosis.Contracts.KeyperSetManager.Hex()). - Msg("initializing sequencer syncer") - contract, err := sequencerBindings.NewSequencer(kpr.config.Gnosis.Contracts.Sequencer, client) - if err != nil { - return err - } - kpr.sequencerSyncer = &SequencerSyncer{ - Contract: contract, - DBPool: kpr.dbpool, - ExecutionClient: client, - GenesisSlotTimestamp: kpr.config.Gnosis.GenesisSlotTimestamp, - SecondsPerSlot: kpr.config.Gnosis.SecondsPerSlot, - SyncStartBlockNumber: kpr.config.Gnosis.SyncStartBlockNumber, - } - - // Perform an initial sync now because it might take some time and doing so during regular - // slot processing might hold up things - latestHeader, err := client.HeaderByNumber(ctx, nil) - if err != nil { - return errors.Wrap(err, "failed to get latest block header") - } - err = kpr.sequencerSyncer.Sync(ctx, latestHeader) - if err != nil { - return err - } - - return nil + return err } -func (kpr *Keyper) initValidatorSyncer(ctx context.Context) error { - validatorSyncerClient, err := ethclient.DialContext(ctx, kpr.config.Gnosis.Node.EthereumURL) - if err != nil { - return errors.Wrap(err, "failed to dial ethereum node") - } - chainID, err := validatorSyncerClient.ChainID(ctx) - if err != nil { - return errors.Wrap(err, "failed to get chain ID") - } - validatorRegistryContract, err := validatorRegistryBindings.NewValidatorregistry( - kpr.config.Gnosis.Contracts.ValidatorRegistry, - validatorSyncerClient, +func (kpr *Keyper) maybeDecryptOnNewHeader(ctx context.Context, header *types.Header) error { + slot := medley.BlockTimestampToSlot( + header.Time, + kpr.config.Gnosis.GenesisSlotTimestamp, + kpr.config.Gnosis.SecondsPerSlot, ) - if err != nil { - return errors.Wrap(err, "failed to instantiate validator registry contract") - } - kpr.validatorSyncer = &ValidatorSyncer{ - Contract: validatorRegistryContract, - DBPool: kpr.dbpool, - BeaconAPIClient: kpr.beaconAPIClient, - ExecutionClient: validatorSyncerClient, - ChainID: chainID.Uint64(), - SyncStartBlockNumber: kpr.config.Gnosis.SyncStartBlockNumber, - } - - // Perform an initial sync now because it might take some time and doing so during regular - // slot processing might hold up things - latestHeader, err := validatorSyncerClient.HeaderByNumber(ctx, nil) - if err != nil { - return errors.Wrap(err, "failed to get latest block header") - } - err = kpr.validatorSyncer.Sync(ctx, latestHeader) - if err != nil { - return err - } - return nil + return kpr.maybeDecryptOnNewSlot(ctx, slot+1) } func (kpr *Keyper) processInputs(ctx context.Context) error { - var err error for { select { - case ev := <-kpr.newBlocks: - err = kpr.processNewBlock(ctx, ev) - case ev := <-kpr.newKeyperSets: - err = kpr.processNewKeyperSet(ctx, ev) - case ev := <-kpr.newEonPublicKeys: - err = kpr.processNewEonPublicKey(ctx, ev) + case key := <-kpr.newEonPublicKeys: + kpr.eonKeyPublisher.Publish(key) case slot := <-kpr.slotTicker.C: - err = kpr.processNewSlot(ctx, slot) + logger := log.Logger.With().Uint64("slot-number", slot.Number).Time("slot-start", slot.Start()).Logger() + logger.Debug().Msg("slot ticker fired, try decrypting") + if err := kpr.maybeDecryptOnNewSlot(ctx, slot.Number); err != nil { + // TODO: Check if it's safe to drop those events. If not, we should store the + // ones that remain on the channel in the db and process them when we restart. + if errors.Is(err, errs.ErrCritical) { + return err + } + logger.Error().Err(err).Msg("error trying to decrypt on new slot") + } case <-ctx.Done(): return ctx.Err() } - if err != nil { - // TODO: Check if it's safe to drop those events. If not, we should store the - // ones that remain on the channel in the db and process them when we restart. - // TODO: also, should we stop the keyper or just log the error and continue? - // return err - log.Error().Err(err).Msg("error processing event") - } } } -func (kpr *Keyper) channelNewBlock(_ context.Context, ev *syncevent.LatestBlock) error { - kpr.newBlocks <- ev - return nil -} - -func (kpr *Keyper) channelNewKeyperSet(_ context.Context, ev *syncevent.KeyperSet) error { - kpr.newKeyperSets <- ev - return nil -} - -func (kpr *Keyper) channelNewEonPublicKey(_ context.Context, key keyper.EonPublicKey) error { +func (kpr *Keyper) sendNewEonPubkeyToChannel(_ context.Context, key keyper.EonPublicKey) error { kpr.newEonPublicKeys <- key return nil } diff --git a/rolling-shutter/keyperimpl/gnosis/newblock.go b/rolling-shutter/keyperimpl/gnosis/newblock.go deleted file mode 100644 index ca122bb6e..000000000 --- a/rolling-shutter/keyperimpl/gnosis/newblock.go +++ /dev/null @@ -1,26 +0,0 @@ -package gnosis - -import ( - "context" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" - syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync/event" -) - -func (kpr *Keyper) processNewBlock(ctx context.Context, ev *syncevent.LatestBlock) error { - if kpr.sequencerSyncer != nil { - if err := kpr.sequencerSyncer.Sync(ctx, ev.Header); err != nil { - return err - } - } - err := kpr.validatorSyncer.Sync(ctx, ev.Header) - if err != nil { - return err - } - slot := medley.BlockTimestampToSlot( - ev.Header.Time, - kpr.config.Gnosis.GenesisSlotTimestamp, - kpr.config.Gnosis.SecondsPerSlot, - ) - return kpr.maybeTriggerDecryption(ctx, slot+1) -} diff --git a/rolling-shutter/keyperimpl/gnosis/neweonpublickey.go b/rolling-shutter/keyperimpl/gnosis/neweonpublickey.go deleted file mode 100644 index cdba7f693..000000000 --- a/rolling-shutter/keyperimpl/gnosis/neweonpublickey.go +++ /dev/null @@ -1,12 +0,0 @@ -package gnosis - -import ( - "context" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" -) - -func (kpr *Keyper) processNewEonPublicKey(_ context.Context, key keyper.EonPublicKey) error { //nolint: unparam - kpr.eonKeyPublisher.Publish(key) - return nil -} diff --git a/rolling-shutter/keyperimpl/gnosis/newkeyperset.go b/rolling-shutter/keyperimpl/gnosis/newkeyperset.go deleted file mode 100644 index afedd4205..000000000 --- a/rolling-shutter/keyperimpl/gnosis/newkeyperset.go +++ /dev/null @@ -1,55 +0,0 @@ -package gnosis - -import ( - "context" - - "github.com/jackc/pgx/v4" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - - obskeyper "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" - syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync/event" - "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" -) - -func (kpr *Keyper) processNewKeyperSet(ctx context.Context, ev *syncevent.KeyperSet) error { - isMember := false - for _, m := range ev.Members { - if m.Cmp(kpr.config.GetAddress()) == 0 { - isMember = true - break - } - } - log.Info(). - Uint64("activation-block", ev.ActivationBlock). - Uint64("eon", ev.Eon). - Int("num-members", len(ev.Members)). - Uint64("threshold", ev.Threshold). - Bool("is-member", isMember). - Msg("new keyper set added") - - return kpr.dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { - obskeyperdb := obskeyper.New(tx) - - keyperConfigIndex, err := medley.Uint64ToInt64Safe(ev.Eon) - if err != nil { - return errors.Wrap(err, ErrParseKeyperSet.Error()) - } - activationBlockNumber, err := medley.Uint64ToInt64Safe(ev.ActivationBlock) - if err != nil { - return errors.Wrap(err, ErrParseKeyperSet.Error()) - } - threshold, err := medley.Uint64ToInt64Safe(ev.Threshold) - if err != nil { - return errors.Wrap(err, ErrParseKeyperSet.Error()) - } - - return obskeyperdb.InsertKeyperSet(ctx, obskeyper.InsertKeyperSetParams{ - KeyperConfigIndex: keyperConfigIndex, - ActivationBlockNumber: activationBlockNumber, - Keypers: shdb.EncodeAddresses(ev.Members), - Threshold: int32(threshold), - }) - }) -} diff --git a/rolling-shutter/keyperimpl/gnosis/newslot.go b/rolling-shutter/keyperimpl/gnosis/newslot.go index fafd7d77f..535181fa9 100644 --- a/rolling-shutter/keyperimpl/gnosis/newslot.go +++ b/rolling-shutter/keyperimpl/gnosis/newslot.go @@ -23,44 +23,59 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/slotticker" "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" ) -func (kpr *Keyper) processNewSlot(ctx context.Context, slot slotticker.Slot) error { - return kpr.maybeTriggerDecryption(ctx, slot.Number) -} - -// maybeTriggerDecryption triggers decryption for the given slot if -// - it hasn't been triggered for this slot before and -// - the keyper is part of the corresponding keyper set. -func (kpr *Keyper) maybeTriggerDecryption(ctx context.Context, slot uint64) error { +// maybeDecryptOnNewSlot tries to trigger decryption for the given slot. +// This will only be conducted when: +// - it hasn't been triggered for this slot before and +// - the keyper is part of the corresponding keyper set. +// +// The function is called from 2 locations: +// - on a local timer event that tries to preempt slots +// - on receival of a new block from the chain-client. +// +// Therefore it is important that it is idempotent when called with the +// same slot (see above). +func (kpr *Keyper) maybeDecryptOnNewSlot(ctx context.Context, slot uint64) error { //nolint: funlen,gocyclo if kpr.latestTriggeredSlot != nil && slot <= *kpr.latestTriggeredSlot { return nil } kpr.latestTriggeredSlot = &slot - fmt.Println("") - fmt.Println("") - fmt.Println(slot) - fmt.Println("") - fmt.Println("") + fmt.Printf("\n\n%d\n\n\n", slot) gnosisKeyperDB := gnosisdatabase.New(kpr.dbpool) - syncedUntil, err := gnosisKeyperDB.GetTransactionSubmittedEventsSyncedUntil(ctx) + latestTxSubmitted, err := gnosisKeyperDB.GetLatestTransactionSubmittedEvent(ctx) if err != nil && err != pgx.ErrNoRows { // pgx.ErrNoRows is expected if we're not part of the keyper set (which is checked later). // That's because non-keypers don't sync transaction submitted events. - return errors.Wrap(err, "failed to query transaction submitted sync status from db") + return errors.Wrap(err, "failed to query latest transaction submitted from db") + } + latestTxSubmittedHeader, err := kpr.core.GetHeaderByHash(ctx, common.BytesToHash(latestTxSubmitted.BlockHash)) + if err != nil { + return errors.Wrap(err, "failed to query header by hash") } - if syncedUntil.Slot >= int64(slot) { + syncedUntilSlot := medley.BlockTimestampToSlot( + latestTxSubmittedHeader.Time, + kpr.config.Gnosis.GenesisSlotTimestamp, + kpr.config.Gnosis.SecondsPerSlot, + ) + if syncedUntilSlot >= slot { // If we already synced the block for slot n before this slot has started on our clock, // either the previous block proposer proposed early (ie is malicious) or our clocks are // out of sync. In any case, it does not make sense to produce keys as the block has // already been built, so we return an error. return errors.Errorf("processing slot %d for which a block has already been processed", slot) } - nextBlock := syncedUntil.BlockNumber + 1 + if !latestTxSubmittedHeader.Number.IsInt64() { + return errors.New("block number int64 overflow, can't process") + } + latestTxSubmittedBlockNumber := latestTxSubmittedHeader.Number.Int64() + if latestTxSubmittedBlockNumber == math.MaxInt64 { + return errors.New("next block number int64 overflow, can't process") + } + nextBlock := latestTxSubmittedBlockNumber + 1 queries := obskeyper.New(kpr.dbpool) keyperSet, err := queries.GetKeyperSet(ctx, nextBlock) @@ -267,8 +282,11 @@ func (kpr *Keyper) triggerDecryption( Int("num-identities", len(trigger.IdentityPreimages)). Int64("tx-pointer", txPointer). Msg("sending decryption trigger") - kpr.decryptionTriggerChannel <- event + // let the keyper core handle the decryption - this channel + // receives on the keyper-core. If a result notification is required, + // we could wait for <-event.Result() somewhere: + kpr.decryptionTriggerChannel <- event return nil } diff --git a/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go b/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go deleted file mode 100644 index 15778e4e8..000000000 --- a/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go +++ /dev/null @@ -1,269 +0,0 @@ -package gnosis - -import ( - "bytes" - "context" - "fmt" - "math" - "math/big" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" - "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" -) - -const AssumedReorgDepth = 10 - -// SequencerSyncer inserts transaction submitted events from the sequencer contract into the database. -type SequencerSyncer struct { - Contract *sequencerBindings.Sequencer - DBPool *pgxpool.Pool - ExecutionClient *ethclient.Client - GenesisSlotTimestamp uint64 - SecondsPerSlot uint64 - SyncStartBlockNumber uint64 -} - -// getNumReorgedBlocks returns the number of blocks that have already been synced, but are no -// longer in the chain. -func getNumReorgedBlocks(syncedUntil *database.TransactionSubmittedEventsSyncedUntil, header *types.Header) int { - shouldBeParent := header.Number.Int64() == syncedUntil.BlockNumber+1 - isParent := bytes.Equal(header.ParentHash.Bytes(), syncedUntil.BlockHash) - isReorg := shouldBeParent && !isParent - if !isReorg { - return 0 - } - // We don't know how deep the reorg is, so we make a conservative guess. Assuming higher depths - // is safer because it means we resync a little bit more. - depth := AssumedReorgDepth - if syncedUntil.BlockNumber < int64(depth) { - return int(syncedUntil.BlockNumber) - } - return depth -} - -// resetSyncStatus clears the db from its recent history after a reorg of given depth. -func (s *SequencerSyncer) resetSyncStatus(ctx context.Context, numReorgedBlocks int) error { - if numReorgedBlocks == 0 { - return nil - } - return s.DBPool.BeginFunc(ctx, func(tx pgx.Tx) error { - queries := database.New(tx) - - syncStatus, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil { - return errors.Wrap(err, "failed to query sync status from db in order to reset it") - } - if syncStatus.BlockNumber < int64(numReorgedBlocks) { - return errors.Wrapf(err, "detected reorg deeper (%d) than blocks synced (%d)", syncStatus.BlockNumber, numReorgedBlocks) - } - - deleteFromInclusive := syncStatus.BlockNumber - int64(numReorgedBlocks) + 1 - - err = queries.DeleteTransactionSubmittedEventsFromBlockNumber(ctx, deleteFromInclusive) - if err != nil { - return errors.Wrap(err, "failed to delete transaction submitted events from db") - } - // Currently, we don't have enough information in the db to populate block hash and slot. - // However, using default values here is fine since the syncer is expected to resync - // immediately after this function call which will set the correct values. When we do proper - // reorg handling, we should store the full block data of the previous blocks so that we can - // avoid this. - newSyncedUntilBlockNumber := deleteFromInclusive - 1 - err = queries.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{}, - BlockNumber: newSyncedUntilBlockNumber, - Slot: 0, - }) - if err != nil { - return errors.Wrap(err, "failed to reset transaction submitted event sync status in db") - } - log.Info(). - Int("depth", numReorgedBlocks). - Int64("previous-synced-until", syncStatus.BlockNumber). - Int64("new-synced-until", newSyncedUntilBlockNumber). - Msg("sync status reset due to reorg") - return nil - }) -} - -func (s *SequencerSyncer) handlePotentialReorg(ctx context.Context, header *types.Header) error { - queries := database.New(s.DBPool) - syncedUntil, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err == pgx.ErrNoRows { - return nil - } - if err != nil { - return errors.Wrap(err, "failed to query transaction submitted events sync status") - } - - numReorgedBlocks := getNumReorgedBlocks(&syncedUntil, header) - if numReorgedBlocks > 0 { - return s.resetSyncStatus(ctx, numReorgedBlocks) - } - return nil -} - -// Sync fetches transaction submitted events from the sequencer contract and inserts them into the -// database. It starts at the end point of the previous call to sync (or 0 if it is the first call) -// and ends at the given block number. -func (s *SequencerSyncer) Sync(ctx context.Context, header *types.Header) error { - if err := s.handlePotentialReorg(ctx, header); err != nil { - return err - } - - queries := database.New(s.DBPool) - syncedUntil, err := queries.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil && err != pgx.ErrNoRows { - return errors.Wrap(err, "failed to query transaction submitted events sync status") - } - var start uint64 - if err == pgx.ErrNoRows { - start = s.SyncStartBlockNumber - } else { - start = uint64(syncedUntil.BlockNumber + 1) - } - endBlock := header.Number.Uint64() - log.Debug(). - Uint64("start-block", start). - Uint64("end-block", endBlock). - Msg("syncing sequencer contract") - - syncRanges := medley.GetSyncRanges(start, endBlock, maxRequestBlockRange) - for _, r := range syncRanges { - err = s.syncRange(ctx, r[0], r[1]) - if err != nil { - return err - } - } - return nil -} - -func (s *SequencerSyncer) syncRange( - ctx context.Context, - start, - end uint64, -) error { - events, err := s.fetchEvents(ctx, start, end) - if err != nil { - return err - } - filteredEvents := s.filterEvents(events) - - header, err := s.ExecutionClient.HeaderByNumber(ctx, new(big.Int).SetUint64(end)) - if err != nil { - return errors.Wrap(err, "failed to get execution block header by number") - } - err = s.DBPool.BeginFunc(ctx, func(tx pgx.Tx) error { - err = s.insertTransactionSubmittedEvents(ctx, tx, filteredEvents) - if err != nil { - return err - } - - slot := medley.BlockTimestampToSlot(header.Time, s.GenesisSlotTimestamp, s.SecondsPerSlot) - return database.New(tx).SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockNumber: int64(end), - BlockHash: header.Hash().Bytes(), - Slot: int64(slot), - }) - }) - log.Info(). - Uint64("start-block", start). - Uint64("end-block", end). - Int("num-inserted-events", len(filteredEvents)). - Int("num-discarded-events", len(events)-len(filteredEvents)). - Msg("synced sequencer contract") - metrics.TxSubmittedEventsSyncedUntil.Set(float64(end)) - return nil -} - -func (s *SequencerSyncer) fetchEvents( - ctx context.Context, - start, - end uint64, -) ([]*sequencerBindings.SequencerTransactionSubmitted, error) { - opts := bind.FilterOpts{ - Start: start, - End: &end, - Context: ctx, - } - it, err := s.Contract.SequencerFilterer.FilterTransactionSubmitted(&opts) - if err != nil { - return nil, errors.Wrap(err, "failed to query transaction submitted events") - } - events := []*sequencerBindings.SequencerTransactionSubmitted{} - for it.Next() { - events = append(events, it.Event) - } - if it.Error() != nil { - return nil, errors.Wrap(it.Error(), "failed to iterate transaction submitted events") - } - return events, nil -} - -func (s *SequencerSyncer) filterEvents( - events []*sequencerBindings.SequencerTransactionSubmitted, -) []*sequencerBindings.SequencerTransactionSubmitted { - filteredEvents := []*sequencerBindings.SequencerTransactionSubmitted{} - for _, event := range events { - if event.Eon > math.MaxInt64 || - !event.GasLimit.IsInt64() { - log.Debug(). - Uint64("eon", event.Eon). - Uint64("block-number", event.Raw.BlockNumber). - Str("block-hash", event.Raw.BlockHash.Hex()). - Uint("tx-index", event.Raw.TxIndex). - Uint("log-index", event.Raw.Index). - Msg("ignoring transaction submitted event with high eon") - continue - } - filteredEvents = append(filteredEvents, event) - } - return filteredEvents -} - -// insertTransactionSubmittedEvents inserts the given events into the database and updates the -// transaction submitted event number accordingly. -func (s *SequencerSyncer) insertTransactionSubmittedEvents( - ctx context.Context, - tx pgx.Tx, - events []*sequencerBindings.SequencerTransactionSubmitted, -) error { - queries := database.New(tx) - for _, event := range events { - _, err := queries.InsertTransactionSubmittedEvent(ctx, database.InsertTransactionSubmittedEventParams{ - Index: int64(event.TxIndex), - BlockNumber: int64(event.Raw.BlockNumber), - BlockHash: event.Raw.BlockHash[:], - TxIndex: int64(event.Raw.TxIndex), - LogIndex: int64(event.Raw.Index), - Eon: int64(event.Eon), - IdentityPrefix: event.IdentityPrefix[:], - Sender: shdb.EncodeAddress(event.Sender), - GasLimit: event.GasLimit.Int64(), - }) - if err != nil { - return errors.Wrap(err, "failed to insert transaction submitted event into db") - } - metrics.LatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex)) - log.Debug(). - Uint64("index", event.TxIndex). - Uint64("block", event.Raw.BlockNumber). - Uint64("eon", event.Eon). - Hex("identityPrefix", event.IdentityPrefix[:]). - Hex("sender", event.Sender.Bytes()). - Uint64("gasLimit", event.GasLimit.Uint64()). - Msg("synced new transaction submitted event") - } - return nil -}