Skip to content

Commit f38f9f1

Browse files
authored
Merge pull request #15 from vimeo/refresh_lock_abort_loop_leased
refreshLock: abort loop on term overruns
2 parents bb1327f + e4054e5 commit f38f9f1

File tree

5 files changed

+386
-34
lines changed

5 files changed

+386
-34
lines changed

.github/workflows/go.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@ jobs:
88
strategy:
99
matrix:
1010
os: [macOS-latest, ubuntu-latest]
11-
goversion: [1.13, 1.14, 1.15]
11+
goversion: [1.17, 1.18, 1.19, '1.20']
1212
steps:
1313

1414
- name: Set up Go ${{matrix.goversion}} on ${{matrix.os}}
15-
uses: actions/setup-go@v1
15+
uses: actions/setup-go@v3
1616
with:
1717
go-version: ${{matrix.goversion}}
1818
id: go
1919

2020
- name: Check out code into the Go module directory
21-
uses: actions/checkout@v1
21+
uses: actions/checkout@v3
2222

2323
- name: gofmt
2424
run: |

campaign.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ func (c *campaign) manageWin(ctx context.Context, winningEntry *entry.RaceEntry,
145145
defer func() { wg.Add(1); go func() { defer wg.Done(); c.c.OnOusting(ctx) }() }()
146146
}
147147
defer electedCancel()
148+
// Wait for any outstanding goroutines before running OnElected (in particular, wait for
149+
// previous OnOusting callbacks to complete.
150+
wg.Wait()
151+
148152
wg.Add(1)
149153
go func(ctx context.Context) { defer wg.Done(); c.c.OnElected(ctx, &tv) }(electedCtx)
150154
finalEntry, refreshErr := c.refreshLock(ctx, winningEntry, &tv)
@@ -224,14 +228,27 @@ func (c *campaign) refreshLock(ctx context.Context, electedEntry *entry.RaceEntr
224228
entry := &entryVal
225229
// we have the lock, we should try to keep it
226230
for {
231+
// Check whether we still have some time left on the term before we try to do any
232+
// real work (if we're past the end of the term, we've lost and need to reacquire
233+
// anyway)
234+
if c.clock.Until(entry.TermExpiry) < 0 {
235+
return entry, context.DeadlineExceeded
236+
}
227237
newEntry, refreshErr := c.refreshLockOnce(ctx, entry)
228238
switch refreshErr {
229239
case nil:
230240
tv.Set(newEntry.TermExpiry)
231241
entry = newEntry
232-
case context.DeadlineExceeded, context.Canceled, errAcquireFailed:
242+
case context.Canceled, errAcquireFailed:
233243
return entry, refreshErr
234244
default:
245+
if errors.Is(refreshErr, context.Canceled) {
246+
return entry, context.Canceled
247+
}
248+
// Check whether we still have some time left on the term before we try to sleep
249+
if c.clock.Until(entry.TermExpiry) < 0 {
250+
return entry, context.DeadlineExceeded
251+
}
235252
}
236253

237254
// wake up in half the interval until the leadership term

0 commit comments

Comments
 (0)