Skip to content

Commit b7420c5

Browse files
authored
Merge pull request #20683 from henrybear327/robustness-test-introduce-cont-watch-creation-config
Implement a feature to periodically open watches in robustness test and make it configurable
2 parents e246aea + 2d451d0 commit b7420c5

File tree

5 files changed

+105
-11
lines changed

5 files changed

+105
-11
lines changed

tests/antithesis/test-template/robustness/traffic/main.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.etcd.io/etcd/tests/v3/antithesis/test-template/robustness/common"
3434
"go.etcd.io/etcd/tests/v3/robustness/client"
3535
"go.etcd.io/etcd/tests/v3/robustness/identity"
36+
"go.etcd.io/etcd/tests/v3/robustness/options"
3637
robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random"
3738
"go.etcd.io/etcd/tests/v3/robustness/report"
3839
"go.etcd.io/etcd/tests/v3/robustness/traffic"
@@ -46,6 +47,10 @@ var (
4647
MemberClientCount: 3,
4748
ClusterClientCount: 1,
4849
MaxNonUniqueRequestConcurrency: 3,
50+
BackgroundWatchConfig: options.BackgroundWatchConfig{
51+
Interval: 0,
52+
RevisionOffset: 0,
53+
},
4954
}
5055
trafficNames = []string{
5156
"etcd",
@@ -121,11 +126,12 @@ func runTraffic(ctx context.Context, lg *zap.Logger, tf traffic.Traffic, hosts [
121126
defer watchSet.Close()
122127
g.Go(func() error {
123128
err := client.CollectClusterWatchEvents(ctx, client.CollectClusterWatchEventsParam{
124-
Lg: lg,
125-
Endpoints: hosts,
126-
MaxRevisionChan: maxRevisionChan,
127-
Cfg: watchConfig,
128-
ClientSet: watchSet,
129+
Lg: lg,
130+
Endpoints: hosts,
131+
MaxRevisionChan: maxRevisionChan,
132+
Cfg: watchConfig,
133+
ClientSet: watchSet,
134+
BackgroundWatchConfig: profile.BackgroundWatchConfig,
129135
})
130136
return err
131137
})

tests/robustness/client/watch.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
"go.uber.org/zap"
2324
"golang.org/x/sync/errgroup"
2425

26+
"go.etcd.io/etcd/tests/v3/robustness/options"
2527
"go.etcd.io/etcd/tests/v3/robustness/report"
2628
)
2729

@@ -31,6 +33,7 @@ type CollectClusterWatchEventsParam struct {
3133
MaxRevisionChan <-chan int64
3234
Cfg WatchConfig
3335
ClientSet *ClientSet
36+
options.BackgroundWatchConfig
3437
}
3538

3639
func CollectClusterWatchEvents(ctx context.Context, param CollectClusterWatchEventsParam) error {
@@ -51,14 +54,29 @@ func CollectClusterWatchEvents(ctx context.Context, param CollectClusterWatchEve
5154
return err
5255
})
5356
}
54-
57+
finish := make(chan struct{})
5558
g.Go(func() error {
5659
maxRevision := <-param.MaxRevisionChan
5760
for _, memberChan := range memberMaxRevisionChans {
5861
memberChan <- maxRevision
5962
}
63+
close(finish)
6064
return nil
6165
})
66+
67+
if param.BackgroundWatchConfig.Interval > 0 {
68+
for _, endpoint := range param.Endpoints {
69+
g.Go(func() error {
70+
c, err := param.ClientSet.NewClient([]string{endpoint})
71+
if err != nil {
72+
return err
73+
}
74+
defer c.Close()
75+
return openWatchPeriodically(ctx, &g, c, param.BackgroundWatchConfig, finish)
76+
})
77+
}
78+
}
79+
6280
return g.Wait()
6381
}
6482

@@ -130,3 +148,38 @@ resetWatch:
130148
}
131149
}
132150
}
151+
152+
func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingClient, backgroundWatchConfig options.BackgroundWatchConfig, finish <-chan struct{}) error {
153+
for {
154+
select {
155+
case <-ctx.Done():
156+
return ctx.Err()
157+
case <-finish:
158+
return nil
159+
case <-time.After(backgroundWatchConfig.Interval):
160+
}
161+
g.Go(func() error {
162+
resp, err := c.Get(ctx, "/key")
163+
if err != nil {
164+
return err
165+
}
166+
rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset
167+
168+
watchCtx, cancel := context.WithCancel(ctx)
169+
defer cancel()
170+
w := c.Watch(watchCtx, "", rev, true, true, true)
171+
for {
172+
select {
173+
case <-ctx.Done():
174+
return ctx.Err()
175+
case <-finish:
176+
return nil
177+
case _, ok := <-w:
178+
if !ok {
179+
return nil
180+
}
181+
}
182+
}
183+
})
184+
}
185+
}

tests/robustness/main_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,12 @@ func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg
168168
g.Go(func() error {
169169
endpoints := processEndpoints(clus)
170170
err := client.CollectClusterWatchEvents(ctx, client.CollectClusterWatchEventsParam{
171-
Lg: lg,
172-
Endpoints: endpoints,
173-
MaxRevisionChan: maxRevisionChan,
174-
Cfg: s.Watch,
175-
ClientSet: watchSet,
171+
Lg: lg,
172+
Endpoints: endpoints,
173+
MaxRevisionChan: maxRevisionChan,
174+
Cfg: s.Watch,
175+
ClientSet: watchSet,
176+
BackgroundWatchConfig: s.Profile.BackgroundWatchConfig,
176177
})
177178
return err
178179
})
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package options
16+
17+
import "time"
18+
19+
type BackgroundWatchConfig struct {
20+
Interval time.Duration
21+
RevisionOffset int64
22+
}

tests/robustness/traffic/traffic.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.etcd.io/etcd/tests/v3/robustness/client"
2929
"go.etcd.io/etcd/tests/v3/robustness/identity"
3030
"go.etcd.io/etcd/tests/v3/robustness/model"
31+
"go.etcd.io/etcd/tests/v3/robustness/options"
3132
"go.etcd.io/etcd/tests/v3/robustness/report"
3233
"go.etcd.io/etcd/tests/v3/robustness/validate"
3334
)
@@ -295,6 +296,7 @@ type Profile struct {
295296
ClusterClientCount int
296297
ForbidCompaction bool
297298
CompactPeriod time.Duration
299+
options.BackgroundWatchConfig
298300
}
299301

300302
func (p Profile) WithoutCompaction() Profile {
@@ -307,6 +309,16 @@ func (p Profile) WithCompactionPeriod(cp time.Duration) Profile {
307309
return p
308310
}
309311

312+
func (p Profile) WithBackgroundWatchConfigInterval(interval time.Duration) Profile {
313+
p.BackgroundWatchConfig.Interval = interval
314+
return p
315+
}
316+
317+
func (p Profile) WithBackgroundWatchConfigRevisionOffset(offset int64) Profile {
318+
p.BackgroundWatchConfig.RevisionOffset = offset
319+
return p
320+
}
321+
310322
type RunTrafficLoopParam struct {
311323
Client *client.RecordingClient
312324
QPSLimiter *rate.Limiter

0 commit comments

Comments
 (0)