Skip to content

Commit 6bef0a5

Browse files
committed
sync2: add config validation (#6548)
## Motivation Need to make sure syncv2 user-provided config is valid.
1 parent 471889f commit 6bef0a5

File tree

12 files changed

+294
-123
lines changed

12 files changed

+294
-123
lines changed

node/node.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ func (app *App) initServices(ctx context.Context) error {
807807
app.Config.Sync.MalSync.MinSyncPeers = max(1, app.Config.P2P.MinPeers)
808808
}
809809
app.syncLogger = app.addLogger(SyncLogger, lg)
810-
syncer := syncer.NewSyncer(
810+
syncer, err := syncer.NewSyncer(
811811
app.cachedDB,
812812
app.clock,
813813
msh,
@@ -829,6 +829,9 @@ func (app *App) initServices(ctx context.Context) error {
829829
syncer.WithConfig(syncerConf),
830830
syncer.WithLogger(app.syncLogger.Zap()),
831831
)
832+
if err != nil {
833+
return fmt.Errorf("create syncer: %w", err)
834+
}
832835
// TODO(dshulyak) this needs to be improved, but dependency graph is a bit complicated
833836
beaconProtocol.SetSyncState(syncer)
834837
hOracle.SetSync(syncer)

sync2/atxs.go

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"fmt"
77
"sync"
8-
"time"
98

109
"github.com/jonboulle/clockwork"
1110
"github.com/libp2p/go-libp2p/core/host"
@@ -32,40 +31,33 @@ const (
3231
)
3332

3433
type ATXHandler struct {
35-
logger *zap.Logger
36-
f Fetcher
37-
clock clockwork.Clock
38-
batchSize int
39-
maxAttempts int
40-
maxBatchRetries int
41-
failedBatchDelay time.Duration
34+
logger *zap.Logger
35+
f Fetcher
36+
clock clockwork.Clock
37+
cfg Config
4238
}
4339

4440
var _ multipeer.SyncKeyHandler = &ATXHandler{}
4541

4642
func NewATXHandler(
4743
logger *zap.Logger,
4844
f Fetcher,
49-
batchSize, maxAttempts, maxBatchRetries int,
50-
failedBatchDelay time.Duration,
45+
cfg Config,
5146
clock clockwork.Clock,
5247
) *ATXHandler {
5348
if clock == nil {
5449
clock = clockwork.NewRealClock()
5550
}
5651
return &ATXHandler{
57-
f: f,
58-
logger: logger,
59-
clock: clock,
60-
batchSize: batchSize,
61-
maxAttempts: maxAttempts,
62-
maxBatchRetries: maxBatchRetries,
63-
failedBatchDelay: failedBatchDelay,
52+
f: f,
53+
logger: logger,
54+
clock: clock,
55+
cfg: cfg,
6456
}
6557
}
6658

6759
type commitState struct {
68-
state map[types.ATXID]int
60+
state map[types.ATXID]uint
6961
total int
7062
numDownloaded int
7163
items []types.ATXID
@@ -76,7 +68,7 @@ func (h *ATXHandler) setupState(
7668
base rangesync.OrderedSet,
7769
received rangesync.SeqResult,
7870
) (*commitState, error) {
79-
state := make(map[types.ATXID]int)
71+
state := make(map[types.ATXID]uint)
8072
for k := range received.Seq {
8173
found, err := base.Has(k)
8274
if err != nil {
@@ -95,15 +87,15 @@ func (h *ATXHandler) setupState(
9587
return &commitState{
9688
state: state,
9789
total: len(state),
98-
items: make([]types.ATXID, 0, h.batchSize),
90+
items: make([]types.ATXID, 0, h.cfg.BatchSize),
9991
}, nil
10092
}
10193

10294
func (h *ATXHandler) getAtxs(ctx context.Context, cs *commitState) (bool, error) {
10395
cs.items = cs.items[:0] // reuse the slice to reduce allocations
10496
for id := range cs.state {
10597
cs.items = append(cs.items, id)
106-
if len(cs.items) == h.batchSize {
98+
if uint(len(cs.items)) == h.cfg.BatchSize {
10799
break
108100
}
109101
}
@@ -121,7 +113,7 @@ func (h *ATXHandler) getAtxs(ctx context.Context, cs *commitState) (bool, error)
121113
h.logger.Debug("failed to download ATX",
122114
zap.String("atx", id.ShortString()), zap.Error(err))
123115
delete(cs.state, id)
124-
case cs.state[id] >= h.maxAttempts-1:
116+
case cs.state[id] >= h.cfg.MaxAttempts-1:
125117
h.logger.Debug("failed to download ATX: max attempts reached",
126118
zap.String("atx", id.ShortString()))
127119
delete(cs.state, id)
@@ -145,7 +137,7 @@ func (h *ATXHandler) Commit(
145137
return err
146138
}
147139
startTime := h.clock.Now()
148-
batchAttemptsRemaining := h.maxBatchRetries
140+
batchAttemptsRemaining := h.cfg.MaxBatchRetries
149141
for len(cs.state) > 0 {
150142
someSucceeded, err := h.getAtxs(ctx, cs)
151143
batchErr := &fetch.BatchError{}
@@ -162,17 +154,17 @@ func (h *ATXHandler) Commit(
162154
}
163155
batchAttemptsRemaining--
164156
h.logger.Debug("failed to download any ATXs: will retry batch",
165-
zap.Int("remaining", batchAttemptsRemaining),
166-
zap.Duration("delay", h.failedBatchDelay))
157+
zap.Uint("remaining", batchAttemptsRemaining),
158+
zap.Duration("delay", h.cfg.FailedBatchDelay))
167159
select {
168160
case <-ctx.Done():
169161
return ctx.Err()
170-
case <-h.clock.After(h.failedBatchDelay):
162+
case <-h.clock.After(h.cfg.FailedBatchDelay):
171163
continue
172164
}
173165
}
174166

175-
batchAttemptsRemaining = h.maxBatchRetries
167+
batchAttemptsRemaining = h.cfg.MaxBatchRetries
176168
elapsed := h.clock.Since(startTime)
177169
h.logger.Debug("fetched atxs",
178170
zap.Int("total", cs.total),
@@ -197,14 +189,17 @@ func NewMultiEpochATXSyncer(
197189
hss HashSyncSource,
198190
oldCfg, newCfg Config,
199191
parallelLoadLimit int,
200-
) *MultiEpochATXSyncer {
192+
) (*MultiEpochATXSyncer, error) {
193+
if !oldCfg.Validate(logger) || !newCfg.Validate(logger) {
194+
return nil, errors.New("invalid config")
195+
}
201196
return &MultiEpochATXSyncer{
202197
logger: logger,
203198
oldCfg: oldCfg,
204199
newCfg: newCfg,
205200
parallelLoadLimit: parallelLoadLimit,
206201
hss: hss,
207-
}
202+
}, nil
208203
}
209204

210205
func (s *MultiEpochATXSyncer) load(newEpoch types.EpochID) error {
@@ -226,7 +221,10 @@ func (s *MultiEpochATXSyncer) load(newEpoch types.EpochID) error {
226221
if epoch == newEpoch {
227222
cfg = s.newCfg
228223
}
229-
hs := s.hss.CreateHashSync(name, cfg, epoch)
224+
hs, err := s.hss.CreateHashSync(name, cfg, epoch)
225+
if err != nil {
226+
return fmt.Errorf("create ATX syncer for epoch %d: %w", epoch, err)
227+
}
230228
if err := hs.Load(); err != nil {
231229
return fmt.Errorf("load ATX syncer for epoch %d: %w", epoch, err)
232230
}
@@ -303,9 +301,9 @@ func NewATXSyncer(
303301
peers *peers.Peers,
304302
epoch types.EpochID,
305303
enableActiveSync bool,
306-
) *P2PHashSync {
307-
curSet := dbset.NewDBSet(db, atxsTable(epoch), 32, cfg.MaxDepth)
308-
handler := NewATXHandler(logger, f, cfg.BatchSize, cfg.MaxAttempts, cfg.MaxBatchRetries, cfg.FailedBatchDelay, nil)
304+
) (*P2PHashSync, error) {
305+
curSet := dbset.NewDBSet(db, atxsTable(epoch), 32, int(cfg.MaxDepth))
306+
handler := NewATXHandler(logger, f, cfg, nil)
309307
return NewP2PHashSync(logger, d, name, curSet, 32, peers, handler, cfg, enableActiveSync)
310308
}
311309

@@ -338,6 +336,6 @@ func NewATXSyncSource(
338336
}
339337

340338
// CreateHashSync implements HashSyncSource.
341-
func (as *ATXSyncSource) CreateHashSync(name string, cfg Config, epoch types.EpochID) HashSync {
339+
func (as *ATXSyncSource) CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) {
342340
return NewATXSyncer(as.logger.Named(name), as.d, name, cfg, as.db, as.f, as.peers, epoch, as.enableActiveSync)
343341
}

sync2/atxs_test.go

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ func atxSeqResult(atxs []types.ATXID) rangesync.SeqResult {
3636
}
3737
}
3838

39+
var testCfg = sync2.Config{
40+
BatchSize: 4,
41+
MaxAttempts: 3,
42+
MaxBatchRetries: 2,
43+
FailedBatchDelay: 10 * time.Second,
44+
}
45+
3946
func TestAtxHandler_Success(t *testing.T) {
40-
const (
41-
batchSize = 4
42-
maxAttempts = 3
43-
maxBatchRetries = 2
44-
batchRetryDelay = 10 * time.Second
45-
)
4647
ctrl := gomock.NewController(t)
4748
allAtxs := make([]types.ATXID, 10)
4849
logger := zaptest.NewLogger(t)
@@ -52,7 +53,7 @@ func TestAtxHandler_Success(t *testing.T) {
5253
}
5354
f := NewMockFetcher(ctrl)
5455
clock := clockwork.NewFakeClock()
55-
h := sync2.NewATXHandler(logger, f, batchSize, maxAttempts, maxBatchRetries, batchRetryDelay, clock)
56+
h := sync2.NewATXHandler(logger, f, testCfg, clock)
5657
baseSet := mocks.NewMockOrderedSet(ctrl)
5758
for _, id := range allAtxs {
5859
baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes()))
@@ -84,12 +85,6 @@ func TestAtxHandler_Success(t *testing.T) {
8485
}
8586

8687
func TestAtxHandler_Retry(t *testing.T) {
87-
const (
88-
batchSize = 4
89-
maxAttempts = 3
90-
maxBatchRetries = 2
91-
batchRetryDelay = 10 * time.Second
92-
)
9388
ctrl := gomock.NewController(t)
9489
allAtxs := make([]types.ATXID, 10)
9590
logger := zaptest.NewLogger(t)
@@ -99,7 +94,7 @@ func TestAtxHandler_Retry(t *testing.T) {
9994
}
10095
f := NewMockFetcher(ctrl)
10196
clock := clockwork.NewFakeClock()
102-
h := sync2.NewATXHandler(logger, f, batchSize, maxAttempts, maxBatchRetries, batchRetryDelay, clock)
97+
h := sync2.NewATXHandler(logger, f, testCfg, clock)
10398
baseSet := mocks.NewMockOrderedSet(ctrl)
10499
for _, id := range allAtxs {
105100
baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes()))
@@ -159,7 +154,7 @@ func TestAtxHandler_Retry(t *testing.T) {
159154
if ctx.Err() != nil {
160155
return nil
161156
}
162-
clock.Advance(batchRetryDelay)
157+
clock.Advance(testCfg.FailedBatchDelay)
163158
}
164159
})
165160

@@ -170,19 +165,13 @@ func TestAtxHandler_Retry(t *testing.T) {
170165
}
171166

172167
func TestAtxHandler_Cancel(t *testing.T) {
173-
const (
174-
batchSize = 4
175-
maxAttempts = 3
176-
maxBatchRetries = 2
177-
batchRetryDelay = 10 * time.Second
178-
)
179168
atxID := types.RandomATXID()
180169
ctrl := gomock.NewController(t)
181170
logger := zaptest.NewLogger(t)
182171
peer := p2p.Peer("foobar")
183172
f := NewMockFetcher(ctrl)
184173
clock := clockwork.NewFakeClock()
185-
h := sync2.NewATXHandler(logger, f, batchSize, maxAttempts, maxBatchRetries, batchRetryDelay, clock)
174+
h := sync2.NewATXHandler(logger, f, testCfg, clock)
186175
baseSet := mocks.NewMockOrderedSet(ctrl)
187176
baseSet.EXPECT().Has(rangesync.KeyBytes(atxID.Bytes())).Return(false, nil)
188177
f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{atxID.Hash32()})
@@ -200,12 +189,6 @@ func TestAtxHandler_Cancel(t *testing.T) {
200189
}
201190

202191
func TestAtxHandler_BatchRetry(t *testing.T) {
203-
const (
204-
batchSize = 4
205-
maxAttempts = 3
206-
maxBatchRetries = 2
207-
batchRetryDelay = 10 * time.Second
208-
)
209192
ctrl := gomock.NewController(t)
210193
allAtxs := make([]types.ATXID, 10)
211194
logger := zaptest.NewLogger(t)
@@ -215,7 +198,7 @@ func TestAtxHandler_BatchRetry(t *testing.T) {
215198
}
216199
clock := clockwork.NewFakeClock()
217200
f := NewMockFetcher(ctrl)
218-
h := sync2.NewATXHandler(logger, f, batchSize, maxAttempts, maxBatchRetries, batchRetryDelay, clock)
201+
h := sync2.NewATXHandler(logger, f, testCfg, clock)
219202
baseSet := mocks.NewMockOrderedSet(ctrl)
220203
for _, id := range allAtxs {
221204
baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes()))
@@ -249,18 +232,12 @@ func TestAtxHandler_BatchRetry(t *testing.T) {
249232
}
250233
return nil
251234
}).Times(3)
252-
clock.Advance(batchRetryDelay)
235+
clock.Advance(testCfg.FailedBatchDelay)
253236
require.NoError(t, eg.Wait())
254237
require.Empty(t, toFetch)
255238
}
256239

257240
func TestAtxHandler_BatchRetry_Fail(t *testing.T) {
258-
const (
259-
batchSize = 4
260-
maxAttempts = 3
261-
maxBatchRetries = 2
262-
batchRetryDelay = 10 * time.Second
263-
)
264241
ctrl := gomock.NewController(t)
265242
allAtxs := make([]types.ATXID, 10)
266243
logger := zaptest.NewLogger(t)
@@ -270,7 +247,7 @@ func TestAtxHandler_BatchRetry_Fail(t *testing.T) {
270247
}
271248
clock := clockwork.NewFakeClock()
272249
f := NewMockFetcher(ctrl)
273-
h := sync2.NewATXHandler(logger, f, batchSize, maxAttempts, maxBatchRetries, batchRetryDelay, clock)
250+
h := sync2.NewATXHandler(logger, f, testCfg, clock)
274251
baseSet := mocks.NewMockOrderedSet(ctrl)
275252
for _, id := range allAtxs {
276253
baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes()))
@@ -296,7 +273,7 @@ func TestAtxHandler_BatchRetry_Fail(t *testing.T) {
296273
})
297274
for range 2 {
298275
clock.BlockUntil(1)
299-
clock.Advance(batchRetryDelay)
276+
clock.Advance(testCfg.FailedBatchDelay)
300277
}
301278
require.Error(t, eg.Wait())
302279
}
@@ -309,7 +286,8 @@ func TestMultiEpochATXSyncer(t *testing.T) {
309286
newCfg := sync2.DefaultConfig()
310287
newCfg.MaxDepth = 24
311288
hss := NewMockHashSyncSource(ctrl)
312-
mhs := sync2.NewMultiEpochATXSyncer(logger, hss, oldCfg, newCfg, 1)
289+
mhs, err := sync2.NewMultiEpochATXSyncer(logger, hss, oldCfg, newCfg, 1)
290+
require.NoError(t, err)
313291
ctx := context.Background()
314292

315293
lastSynced, err := mhs.EnsureSync(ctx, 0, 0)
@@ -319,7 +297,7 @@ func TestMultiEpochATXSyncer(t *testing.T) {
319297
var syncActions []string
320298
curIdx := 0
321299
hss.EXPECT().CreateHashSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
322-
func(name string, cfg sync2.Config, epoch types.EpochID) sync2.HashSync {
300+
func(name string, cfg sync2.Config, epoch types.EpochID) (sync2.HashSync, error) {
323301
idx := curIdx
324302
curIdx++
325303
syncActions = append(syncActions,
@@ -339,7 +317,7 @@ func TestMultiEpochATXSyncer(t *testing.T) {
339317
hs.EXPECT().Stop().DoAndReturn(func() {
340318
syncActions = append(syncActions, fmt.Sprintf("stop %d %s", idx, name))
341319
}).AnyTimes()
342-
return hs
320+
return hs, nil
343321
}).AnyTimes()
344322

345323
// Last wait epoch 3, new epoch 3

sync2/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type HashSync interface {
2323
}
2424

2525
type HashSyncSource interface {
26-
CreateHashSync(name string, cfg Config, epoch types.EpochID) HashSync
26+
CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error)
2727
}
2828

2929
type LayerTicker interface {

0 commit comments

Comments
 (0)