Skip to content

Commit e9c5136

Browse files
committed
chore(chainsync): add unit test for reorgs
1 parent 805f025 commit e9c5136

File tree

10 files changed

+298
-13
lines changed

10 files changed

+298
-13
lines changed

rolling-shutter/medley/chainsync/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{}
2424
var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option")
2525

2626
type Client struct {
27-
client.EthereumClient
27+
client.SyncEthereumClient
2828
log log.Logger
2929

3030
options *options
@@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by
136136
// This value is cached, since it is not expected to change.
137137
func (s *Client) ChainID(ctx context.Context) (*big.Int, error) {
138138
if s.chainID == nil {
139-
cid, err := s.EthereumClient.ChainID(ctx)
139+
cid, err := s.SyncEthereumClient.ChainID(ctx)
140140
if err != nil {
141141
return nil, err
142142
}

rolling-shutter/medley/chainsync/client/client.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/ethereum/go-ethereum/core/types"
1010
)
1111

12-
type EthereumClient interface {
12+
type FullEthereumClient interface {
1313
Close()
1414
ChainID(ctx context.Context) (*big.Int, error)
1515
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
@@ -45,3 +45,14 @@ type EthereumClient interface {
4545
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
4646
SendTransaction(ctx context.Context, tx *types.Transaction) error
4747
}
48+
49+
type SyncEthereumClient interface {
50+
Close()
51+
ChainID(ctx context.Context) (*big.Int, error)
52+
BlockNumber(ctx context.Context) (uint64, error)
53+
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
54+
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
55+
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
56+
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
57+
SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
"math/big"
7+
8+
"github.com/ethereum/go-ethereum"
9+
"github.com/ethereum/go-ethereum/common"
10+
"github.com/ethereum/go-ethereum/core/types"
11+
)
12+
13+
var ErrNotImplemented = errors.New("not implemented")
14+
15+
var _ SyncEthereumClient = &TestClient{}
16+
17+
type TestClient struct {
18+
headerChain []*types.Header
19+
latestHeadIndex int
20+
intialProgress bool
21+
latestHeadEmitter []chan<- *types.Header
22+
latestHeadSubscription []*Subscription
23+
}
24+
25+
func NewSubscription(idx int) *Subscription {
26+
return &Subscription{
27+
idx: idx,
28+
err: make(chan error, 1),
29+
}
30+
}
31+
32+
type Subscription struct {
33+
idx int
34+
err chan error
35+
}
36+
37+
func (su *Subscription) Unsubscribe() {
38+
// TODO: not implemented yet, but we don't want to panic
39+
}
40+
41+
func (su *Subscription) Err() <-chan error {
42+
return su.err
43+
}
44+
45+
type TestClientController struct {
46+
c *TestClient
47+
}
48+
49+
func NewTestClient() (*TestClient, *TestClientController) {
50+
c := &TestClient{
51+
headerChain: []*types.Header{},
52+
latestHeadIndex: 0,
53+
}
54+
ctrl := &TestClientController{c}
55+
return c, ctrl
56+
}
57+
58+
func (c *TestClientController) ProgressHead() bool {
59+
if c.c.latestHeadIndex >= len(c.c.headerChain)-1 {
60+
return false
61+
}
62+
c.c.latestHeadIndex++
63+
return true
64+
}
65+
66+
func (c *TestClientController) EmitEvents(ctx context.Context) error {
67+
if len(c.c.latestHeadEmitter) == 0 {
68+
return nil
69+
}
70+
h := c.c.getLatestHeader()
71+
for _, em := range c.c.latestHeadEmitter {
72+
select {
73+
case em <- h:
74+
case <-ctx.Done():
75+
return ctx.Err()
76+
}
77+
}
78+
return nil
79+
}
80+
81+
func (c *TestClientController) AppendNextHeaders(h ...*types.Header) {
82+
c.c.headerChain = append(c.c.headerChain, h...)
83+
}
84+
85+
func (t *TestClient) ChainID(_ context.Context) (*big.Int, error) {
86+
return big.NewInt(42), nil
87+
}
88+
89+
func (t *TestClient) Close() {
90+
// TODO: cleanup
91+
}
92+
93+
func (t *TestClient) getLatestHeader() *types.Header {
94+
if len(t.headerChain) == 0 {
95+
return nil
96+
}
97+
return t.headerChain[t.latestHeadIndex]
98+
}
99+
100+
func (t *TestClient) searchBlock(f func(*types.Header) bool) *types.Header {
101+
for i := t.latestHeadIndex; i >= 0; i-- {
102+
h := t.headerChain[i]
103+
if f(h) {
104+
return h
105+
}
106+
}
107+
return nil
108+
}
109+
110+
func (t *TestClient) searchBlockByNumber(number *big.Int) *types.Header {
111+
return t.searchBlock(
112+
func(h *types.Header) bool {
113+
return h.Number.Cmp(number) == 0
114+
})
115+
}
116+
117+
func (t *TestClient) searchBlockByHash(hash common.Hash) *types.Header {
118+
return t.searchBlock(
119+
func(h *types.Header) bool {
120+
return hash.Cmp(h.Hash()) == 0
121+
})
122+
}
123+
124+
func (t *TestClient) BlockNumber(_ context.Context) (uint64, error) {
125+
return t.getLatestHeader().Nonce.Uint64(), nil
126+
}
127+
128+
func (t *TestClient) HeaderByHash(_ context.Context, hash common.Hash) (*types.Header, error) {
129+
h := t.searchBlockByHash(hash)
130+
if h == nil {
131+
// TODO: what error?
132+
}
133+
return h, nil
134+
}
135+
136+
func (t *TestClient) HeaderByNumber(_ context.Context, number *big.Int) (*types.Header, error) {
137+
if number == nil {
138+
return t.getLatestHeader(), nil
139+
}
140+
h := t.searchBlockByNumber(number)
141+
if h == nil {
142+
// TODO: what error?
143+
}
144+
return h, nil
145+
}
146+
147+
func (t *TestClient) SubscribeNewHead(_ context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
148+
t.latestHeadEmitter = append(t.latestHeadEmitter, ch)
149+
su := NewSubscription(len(t.latestHeadSubscription) - 1)
150+
t.latestHeadSubscription = append(t.latestHeadSubscription, su)
151+
// TODO: unsubscribe and deleting from the array
152+
// TODO: filling error promise in the subscription
153+
return su, nil
154+
}
155+
156+
func (t *TestClient) FilterLogs(_ context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
157+
panic(ErrNotImplemented)
158+
}
159+
160+
func (t *TestClient) SubscribeFilterLogs(_ context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
161+
panic(ErrNotImplemented)
162+
}

rolling-shutter/medley/chainsync/options.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type options struct {
2424
keyperSetManagerAddress *common.Address
2525
keyBroadcastContractAddress *common.Address
2626
clientURL string
27-
ethClient syncclient.EthereumClient
27+
ethClient syncclient.FullEthereumClient
2828
logger log.Logger
2929
runner service.Runner
3030
syncStart *number.BlockNumber
@@ -122,7 +122,7 @@ func (o *options) applyHandler(c *Client) error {
122122
// of shutter clients background workers.
123123
func (o *options) apply(ctx context.Context, c *Client) error {
124124
var (
125-
client syncclient.EthereumClient
125+
client syncclient.SyncEthereumClient
126126
err error
127127
)
128128
if o.clientURL != "" {
@@ -132,12 +132,12 @@ func (o *options) apply(ctx context.Context, c *Client) error {
132132
}
133133
}
134134
client = o.ethClient
135-
c.EthereumClient = client
135+
c.SyncEthereumClient = client
136136

137137
// the nil passthrough will use "latest" for each call,
138138
// but we want to harmonize and fix the sync start to a specific block.
139139
if o.syncStart.IsLatest() {
140-
latestBlock, err := c.EthereumClient.BlockNumber(ctx)
140+
latestBlock, err := c.SyncEthereumClient.BlockNumber(ctx)
141141
if err != nil {
142142
return errors.Wrap(err, "polling latest block")
143143
}
@@ -219,7 +219,7 @@ func WithLogger(l log.Logger) Option {
219219
}
220220
}
221221

222-
func WithClient(client syncclient.EthereumClient) Option {
222+
func WithClient(client syncclient.FullEthereumClient) Option {
223223
return func(o *options) error {
224224
o.ethClient = client
225225
return nil

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
var _ ManualFilterHandler = &EonPubKeySyncer{}
1818

1919
type EonPubKeySyncer struct {
20-
Client client.EthereumClient
20+
Client client.SyncEthereumClient
2121
Log log.Logger
2222
KeyBroadcast *bindings.KeyBroadcastContract
2323
KeyperSetManager *bindings.KeyperSetManager

rolling-shutter/medley/chainsync/syncer/keyperset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const channelSize = 10
2323
var _ ManualFilterHandler = &KeyperSetSyncer{}
2424

2525
type KeyperSetSyncer struct {
26-
Client client.EthereumClient
26+
Client client.FullEthereumClient
2727
Contract *bindings.KeyperSetManager
2828
Log log.Logger
2929
Handler event.KeyperSetHandler

rolling-shutter/medley/chainsync/syncer/shutterstate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
var _ ManualFilterHandler = &ShutterStateSyncer{}
1717

1818
type ShutterStateSyncer struct {
19-
Client client.EthereumClient
19+
Client client.SyncEthereumClient
2020
Contract *bindings.KeyperSetManager
2121
Log log.Logger
2222
Handler event.ShutterStateHandler

rolling-shutter/medley/chainsync/syncer/unsafehead.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
type UnsafeHeadSyncer struct {
20-
Client client.EthereumClient
20+
Client client.SyncEthereumClient
2121
Log log.Logger
2222
Handler event.BlockHandler
2323
// Handler to be manually triggered
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package syncer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/big"
7+
"testing"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/common"
11+
"github.com/ethereum/go-ethereum/core/types"
12+
"github.com/ethereum/go-ethereum/log"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client"
14+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event"
15+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number"
16+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
17+
"gotest.tools/v3/assert"
18+
)
19+
20+
func MakeChain(start int64, startParent common.Hash, numHeader uint, seed int64) []*types.Header {
21+
n := numHeader
22+
parent := startParent
23+
num := big.NewInt(start)
24+
h := []*types.Header{}
25+
26+
// change the hashes for different seeds
27+
mixinh := common.BigToHash(big.NewInt(seed))
28+
for n > 0 {
29+
head := &types.Header{
30+
ParentHash: parent,
31+
Number: num,
32+
MixDigest: mixinh,
33+
}
34+
h = append(h, head)
35+
num = new(big.Int).Add(num, big.NewInt(1))
36+
parent = head.Hash()
37+
n--
38+
}
39+
return h
40+
}
41+
42+
func TestReorg(t *testing.T) {
43+
headersBeforeReorg := MakeChain(1, common.BigToHash(big.NewInt(0)), 10, 42)
44+
branchOff := headersBeforeReorg[5]
45+
// block number 5 will be reorged
46+
headersReorgBranch := MakeChain(branchOff.Number.Int64()+1, branchOff.Hash(), 10, 43)
47+
clnt, ctl := client.NewTestClient()
48+
ctl.AppendNextHeaders(headersBeforeReorg...)
49+
ctl.AppendNextHeaders(headersReorgBranch...)
50+
51+
handlerBlock := make(chan *event.LatestBlock, 1)
52+
53+
h := &UnsafeHeadSyncer{
54+
Client: clnt,
55+
Log: log.New(),
56+
Handler: func(_ context.Context, ev *event.LatestBlock) error {
57+
handlerBlock <- ev
58+
return nil
59+
},
60+
SyncedHandler: []ManualFilterHandler{},
61+
SyncStartBlock: number.NewBlockNumber(nil),
62+
FetchActiveAtStart: false,
63+
}
64+
65+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
66+
defer cancel()
67+
service.RunBackground(ctx, h)
68+
69+
// intitial sync is independent of the subscription,
70+
// this will get polled from the eth client
71+
b := <-handlerBlock
72+
assert.Assert(t, b.Number.Cmp(headersBeforeReorg[0].Number) == 0)
73+
idx := 1
74+
for {
75+
ok := ctl.ProgressHead()
76+
assert.Assert(t, ok)
77+
err := ctl.EmitEvents(ctx)
78+
assert.NilError(t, err)
79+
80+
b = <-handlerBlock
81+
assert.Equal(t, b.Number.Uint64(), headersBeforeReorg[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx))
82+
assert.Equal(t, b.BlockHash, headersBeforeReorg[idx].Hash())
83+
idx++
84+
if idx == len(headersBeforeReorg) {
85+
break
86+
}
87+
}
88+
ok := ctl.ProgressHead()
89+
assert.Assert(t, ok)
90+
err := ctl.EmitEvents(ctx)
91+
assert.NilError(t, err)
92+
b = <-handlerBlock
93+
// now the reorg should have happened.
94+
// the handler should have emitted an "artificial" latest head
95+
// event for the block BEFORE the re-orged block
96+
assert.Equal(t, b.Number.Uint64(), headersReorgBranch[0].Number.Uint64()-1, "block number equal for reorg")
97+
assert.Equal(t, b.BlockHash, headersReorgBranch[0].ParentHash)
98+
idx = 0
99+
for ctl.ProgressHead() {
100+
assert.Assert(t, ok)
101+
err := ctl.EmitEvents(ctx)
102+
assert.NilError(t, err)
103+
104+
b := <-handlerBlock
105+
assert.Equal(t, b.Number.Uint64(), headersReorgBranch[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx))
106+
assert.Equal(t, b.BlockHash, headersReorgBranch[idx].Hash())
107+
idx++
108+
if idx == len(headersReorgBranch) {
109+
break
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)