Skip to content

Commit

Permalink
Low nonce consumption (#229)
Browse files Browse the repository at this point in the history
* Fine tune nonce consumption greater than get nonces interval

* lint

* fix test

* make tests less flakey

* fix test race
  • Loading branch information
agouin committed Dec 1, 2023
1 parent 6a974bb commit 4fe383e
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 24 deletions.
98 changes: 79 additions & 19 deletions signer/cosigner_nonce_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const (
defaultGetNoncesInterval = 3 * time.Second
defaultGetNoncesTimeout = 4 * time.Second
defaultNonceExpiration = 10 * time.Second // half of the local cosigner cache expiration
targetTrim = 10
)

type CosignerNonceCache struct {
Expand All @@ -25,7 +24,6 @@ type CosignerNonceCache struct {

lastReconcileNonces lastCount
lastReconcileTime time.Time
noncesPerMinute float64

getNoncesInterval time.Duration
getNoncesTimeout time.Duration
Expand All @@ -36,6 +34,56 @@ type CosignerNonceCache struct {
cache NonceCache

pruner NonceCachePruner

movingAverage *movingAverage

empty chan struct{}
}

type movingAverageItem struct {
timeSinceLastReconcile time.Duration
noncesPerMinute float64
}

type movingAverage struct {
items []movingAverageItem
period time.Duration
}

func newMovingAverage(period time.Duration) *movingAverage {
return &movingAverage{period: period}
}

func (m *movingAverage) add(
timeSinceLastReconcile time.Duration,
noncesPerMinute float64,
) {
duration := timeSinceLastReconcile
keep := len(m.items) - 1
for i, e := range m.items {
duration += e.timeSinceLastReconcile
if duration >= m.period {
keep = i
break
}
}
m.items = append(
[]movingAverageItem{{timeSinceLastReconcile: timeSinceLastReconcile, noncesPerMinute: noncesPerMinute}},
m.items[:keep+1]...,
)
}

func (m *movingAverage) average() float64 {
weightedSum := float64(0)
duration := float64(0)

for _, e := range m.items {
d := float64(e.timeSinceLastReconcile)
weightedSum += e.noncesPerMinute * d
duration += d
}

return weightedSum / duration
}

type lastCount struct {
Expand Down Expand Up @@ -126,6 +174,8 @@ func NewCosignerNonceCache(
nonceExpiration: nonceExpiration,
threshold: threshold,
pruner: pruner,
empty: make(chan struct{}, 1),
movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals
}
// the only time pruner is expected to be non-nil is during tests, otherwise we use the cache logic.
if pruner == nil {
Expand All @@ -143,8 +193,12 @@ func (cnc *CosignerNonceCache) getUuids(n int) []uuid.UUID {
return uuids
}

func (cnc *CosignerNonceCache) target() int {
return int((cnc.noncesPerMinute/60)*cnc.getNoncesInterval.Seconds()*1.2) + int(cnc.noncesPerMinute/30) + targetTrim
func (cnc *CosignerNonceCache) target(noncesPerMinute float64) int {
t := int((noncesPerMinute / 60) * ((cnc.getNoncesInterval.Seconds() * 1.2) + 0.5))
if t <= 0 {
return 1 // always target at least one nonce ready
}
return t
}

func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
Expand All @@ -164,33 +218,31 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
noncesPerMin = 0
}

if cnc.noncesPerMinute == 0 {
// initialize nonces per minute for weighted average
cnc.noncesPerMinute = noncesPerMin
} else {
// weighted average over last 4 intervals
cnc.noncesPerMinute = (cnc.noncesPerMinute*3 + noncesPerMin) / 4
}

defer func() {
cnc.lastReconcileNonces.Set(cnc.cache.Size())
cnc.lastReconcileTime = time.Now()
}()
cnc.movingAverage.add(timeSinceLastReconcile, noncesPerMin)

// calculate how many nonces we need to load to keep up with demand
// load 120% the number of nonces we need to keep up with demand,
// plus a couple seconds worth of nonces to account for nonce consumption during LoadN
// plus 10 for padding

t := cnc.target()
avgNoncesPerMin := cnc.movingAverage.average()
t := cnc.target(avgNoncesPerMin)
additional := t - remainingNonces

defer func() {
cnc.lastReconcileNonces.Set(remainingNonces + additional)
cnc.lastReconcileTime = time.Now()
}()

if additional <= 0 {
additional = 0
// we're ahead of demand, don't load any more
cnc.logger.Debug(
"Cosigner nonce cache ahead of demand",
"target", t,
"remaining", remainingNonces,
"noncesPerMin", cnc.noncesPerMinute,
"nonces_per_min", noncesPerMin,
"avg_nonces_per_min", avgNoncesPerMin,
)
return
}
Expand All @@ -200,7 +252,8 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
"target", t,
"remaining", remainingNonces,
"additional", additional,
"noncesPerMin", cnc.noncesPerMinute,
"nonces_per_min", noncesPerMin,
"avg_nonces_per_min", avgNoncesPerMin,
)

cnc.LoadN(ctx, additional)
Expand Down Expand Up @@ -282,6 +335,8 @@ func (cnc *CosignerNonceCache) Start(ctx context.Context) {
return
case <-ticker.C:
cnc.reconcile(ctx)
case <-cnc.empty:
cnc.reconcile(ctx)
}
}
}
Expand Down Expand Up @@ -310,6 +365,11 @@ CheckNoncesLoop:
// remove this set of nonces from the cache
cnc.cache.Delete(i)

if len(cnc.cache.cache) == 0 {
cnc.logger.Debug("Nonce cache is empty, triggering reload")
cnc.empty <- struct{}{}
}

// all peers found
return &CosignerUUIDNonces{
UUID: cn.UUID,
Expand Down
116 changes: 111 additions & 5 deletions signer/cosigner_nonce_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,39 @@ func TestNonceCache(_ *testing.T) {
nc.Delete(0)
}

func TestMovingAverage(t *testing.T) {
ma := newMovingAverage(12 * time.Second)

ma.add(3*time.Second, 500)
require.Len(t, ma.items, 1)
require.Equal(t, float64(500), ma.average())

ma.add(3*time.Second, 100)
require.Len(t, ma.items, 2)
require.Equal(t, float64(300), ma.average())

ma.add(6*time.Second, 600)
require.Len(t, ma.items, 3)
require.Equal(t, float64(450), ma.average())

// should kick out the first one
ma.add(3*time.Second, 500)
require.Len(t, ma.items, 3)
require.Equal(t, float64(450), ma.average())

// should kick out the second one
ma.add(6*time.Second, 500)
require.Len(t, ma.items, 3)
require.Equal(t, float64(540), ma.average())

for i := 0; i < 5; i++ {
ma.add(2500*time.Millisecond, 1000)
}

require.Len(t, ma.items, 5)
require.Equal(t, float64(1000), ma.average())
}

func TestClearNonces(t *testing.T) {
lcs, _ := getTestLocalCosigners(t, 2, 3)
cosigners := make([]Cosigner, len(lcs))
Expand Down Expand Up @@ -131,10 +164,12 @@ func TestNonceCacheDemand(t *testing.T) {

cancel()

require.LessOrEqual(t, size, nonceCache.target())
require.LessOrEqual(t, size, nonceCache.target(nonceCache.movingAverage.average()))

count, pruned := mp.Result()

require.Greater(t, mp.count, 0)
require.Equal(t, 0, mp.pruned)
require.Greater(t, count, 0)
require.Equal(t, 0, pruned)
}

func TestNonceCacheExpiration(t *testing.T) {
Expand Down Expand Up @@ -180,6 +215,77 @@ func TestNonceCacheExpiration(t *testing.T) {

cancel()

// the cache should have at most the trim padding since no nonces are being consumed.
require.LessOrEqual(t, nonceCache.cache.Size(), targetTrim)
// the cache should be empty or 1 since no nonces are being consumed.
require.LessOrEqual(t, nonceCache.cache.Size(), 1)
}

func TestNonceCacheDemandSlow(t *testing.T) {
lcs, _ := getTestLocalCosigners(t, 2, 3)
cosigners := make([]Cosigner, len(lcs))
for i, lc := range lcs {
cosigners[i] = lc
}

nonceCache := NewCosignerNonceCache(
cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)),
cosigners,
&MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}},
90*time.Millisecond,
100*time.Millisecond,
500*time.Millisecond,
2,
nil,
)

ctx, cancel := context.WithCancel(context.Background())

go nonceCache.Start(ctx)

for i := 0; i < 10; i++ {
time.Sleep(200 * time.Millisecond)
require.Greater(t, nonceCache.cache.Size(), 0)
_, err := nonceCache.GetNonces([]Cosigner{cosigners[0], cosigners[1]})
require.NoError(t, err)
}

cancel()

require.LessOrEqual(t, nonceCache.cache.Size(), nonceCache.target(300))
}

func TestNonceCacheDemandSlowDefault(t *testing.T) {
if testing.Short() {
t.Skip()
}
lcs, _ := getTestLocalCosigners(t, 2, 3)
cosigners := make([]Cosigner, len(lcs))
for i, lc := range lcs {
cosigners[i] = lc
}

nonceCache := NewCosignerNonceCache(
cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)),
cosigners,
&MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}},
defaultGetNoncesInterval,
defaultGetNoncesTimeout,
defaultNonceExpiration,
2,
nil,
)

ctx, cancel := context.WithCancel(context.Background())

go nonceCache.Start(ctx)

for i := 0; i < 10; i++ {
time.Sleep(7 * time.Second)
require.Greater(t, nonceCache.cache.Size(), 0)
_, err := nonceCache.GetNonces([]Cosigner{cosigners[0], cosigners[1]})
require.NoError(t, err)
}

cancel()

require.LessOrEqual(t, nonceCache.cache.Size(), nonceCache.target(60/7))
}
2 changes: 2 additions & 0 deletions signer/threshold_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func testThresholdValidatorLeaderElection(t *testing.T, threshold, total uint8)

err := tv.LoadSignStateIfNecessary(testChainID)
require.NoError(t, err)

require.NoError(t, tv.Start(ctx))
}

quit := make(chan bool)
Expand Down

0 comments on commit 4fe383e

Please sign in to comment.