forked from antrea-io/antrea
-
Notifications
You must be signed in to change notification settings - Fork 0
/
performance_test.go
334 lines (299 loc) · 12.9 KB
/
performance_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
// Copyright 2019 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"flag"
"fmt"
"math/rand"
"net/url"
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
networkv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"antrea.io/antrea/pkg/agent/openflow"
)
const (
seed = 0xA1E47 // Use a specific rand seed to make the generated workloads always same
perfTestAppLabel = "antrea-perf-test"
podsConnectionNetworkPolicyName = "pods.ingress"
workloadNetworkPolicyName = "workloads.ingress"
)
var (
benchNginxPodName = randName(nginxContainerName + "-")
toolboxPodName = randName(toolboxContainerName + "-")
customizeRequests = flag.Int("perf.http.requests", 0, "Number of http requests")
customizePolicyRules = flag.Int("perf.http.policy_rules", 0, "Number of CIDRs in the network policy")
httpConcurrency = flag.Int("perf.http.concurrency", 1, "Number of multiple requests to make at a time")
realizeTimeout = flag.Duration("perf.realize.timeout", 5*time.Minute, "Timeout of the realization of network policies")
labelSelector = &metav1.LabelSelector{
MatchLabels: map[string]string{"app": perfTestAppLabel},
}
)
func BenchmarkHTTPRequest(b *testing.B) {
skipIfNotIPv4Cluster(b)
for _, scale := range []struct{ requests, policyRules int }{
{100000, 0},
{1000000, 0},
{100000, 5000},
{100000, 10000},
{100000, 15000},
} {
b.Run(fmt.Sprintf("Request:%d,PolicyRules:%d", scale.requests, scale.policyRules), func(b *testing.B) {
withPerfTestSetup(func(data *TestData) { httpRequest(scale.requests, scale.policyRules, data, b) }, b)
})
}
}
func BenchmarkRealizeNetworkPolicy(b *testing.B) {
skipIfNotIPv4Cluster(b)
for _, policyRules := range []int{5000, 10000, 15000} {
b.Run(fmt.Sprintf("RealizeNetworkPolicy%d", policyRules), func(b *testing.B) {
withPerfTestSetup(func(data *TestData) { networkPolicyRealize(policyRules, data, b) }, b)
})
}
}
func BenchmarkCustomizeHTTPRequest(b *testing.B) {
skipIfNotIPv4Cluster(b)
if *customizeRequests == 0 {
b.Skip("The value of perf.http.requests=0, skipped")
}
withPerfTestSetup(func(data *TestData) { httpRequest(*customizeRequests, *customizePolicyRules, data, b) }, b)
}
func BenchmarkCustomizeRealizeNetworkPolicy(b *testing.B) {
skipIfNotIPv4Cluster(b)
if *customizePolicyRules == 0 {
b.Skip("The value of perf.http.policy_rules=0, skipped")
}
withPerfTestSetup(func(data *TestData) { networkPolicyRealize(*customizePolicyRules, data, b) }, b)
}
func randCidr(rnd *rand.Rand) string {
getByte := func() int {
return rnd.Intn(255) + 1
}
return fmt.Sprintf("%d.%d.%d.%d/32", getByte(), getByte(), getByte(), getByte())
}
// createPerfTestPodDefinition creates the Pod specification for the perf test.
// The Pod will be scheduled on the control-plane Node.
func createPerfTestPodDefinition(name, containerName, image string) *corev1.Pod {
podSpec := corev1.PodSpec{
Containers: []corev1.Container{
{
Name: containerName,
Image: image,
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
RestartPolicy: corev1.RestartPolicyAlways,
}
podSpec.NodeSelector = map[string]string{
"kubernetes.io/hostname": controlPlaneNodeName(),
}
podSpec.Tolerations = controlPlaneNoScheduleTolerations()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"app": perfTestAppLabel},
},
Spec: podSpec,
}
return pod
}
// setupTestPodsConnection applies the network policy which enables connectivity between test Pods in the cluster.
func setupTestPodsConnection(data *TestData) error {
npSpec := networkv1.NetworkPolicySpec{
PodSelector: *labelSelector,
Ingress: []networkv1.NetworkPolicyIngressRule{
{
From: []networkv1.NetworkPolicyPeer{{PodSelector: labelSelector}},
},
},
PolicyTypes: []networkv1.PolicyType{networkv1.PolicyTypeIngress},
}
np := &networkv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Name: podsConnectionNetworkPolicyName},
Spec: npSpec,
}
_, err := data.clientset.NetworkingV1().NetworkPolicies(data.testNamespace).Create(context.TODO(), np, metav1.CreateOptions{})
return err
}
func generateWorkloadNetworkPolicy(policyRules int) *networkv1.NetworkPolicy {
ingressRules := make([]networkv1.NetworkPolicyPeer, policyRules)
// #nosec G404: random number generator not used for security purposes
rnd := rand.New(rand.NewSource(seed))
existingCIDRs := make(map[string]struct{}) // ensure no duplicated cidrs
for i := 0; i < policyRules; i++ {
cidr := randCidr(rnd)
for _, ok := existingCIDRs[cidr]; ok; {
cidr = randCidr(rnd)
}
existingCIDRs[cidr] = struct{}{}
ingressRules[i] = networkv1.NetworkPolicyPeer{IPBlock: &networkv1.IPBlock{CIDR: cidr}}
}
npSpec := networkv1.NetworkPolicySpec{
PodSelector: *labelSelector,
Ingress: []networkv1.NetworkPolicyIngressRule{{From: ingressRules}},
PolicyTypes: []networkv1.PolicyType{networkv1.PolicyTypeIngress},
}
return &networkv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{Name: workloadNetworkPolicyName},
Spec: npSpec,
}
}
func populateWorkloadNetworkPolicy(np *networkv1.NetworkPolicy, data *TestData) error {
_, err := data.clientset.NetworkingV1().NetworkPolicies(data.testNamespace).Create(context.TODO(), np, metav1.CreateOptions{})
return err
}
func setupTestPods(data *TestData, b *testing.B) (nginxPodIP, perfPodIP *PodIPs) {
b.Logf("Creating a nginx test Pod")
nginxPod := createPerfTestPodDefinition(benchNginxPodName, nginxContainerName, nginxImage)
_, err := data.clientset.CoreV1().Pods(data.testNamespace).Create(context.TODO(), nginxPod, metav1.CreateOptions{})
if err != nil {
b.Fatalf("Error when creating nginx test pod: %v", err)
}
b.Logf("Waiting IP assignment of the nginx test Pod")
nginxPodIP, err = data.podWaitForIPs(defaultTimeout, benchNginxPodName, data.testNamespace)
if err != nil {
b.Fatalf("Error when waiting for IP assignment of nginx test Pod: %v", err)
}
b.Logf("Creating a toolbox test Pod")
perfPod := createPerfTestPodDefinition(toolboxPodName, toolboxContainerName, toolboxImage)
_, err = data.clientset.CoreV1().Pods(data.testNamespace).Create(context.TODO(), perfPod, metav1.CreateOptions{})
if err != nil {
b.Fatalf("Error when creating toolbox test Pod: %v", err)
}
b.Logf("Waiting for IP assignment of the toolbox test Pod")
perfPodIP, err = data.podWaitForIPs(defaultTimeout, toolboxPodName, data.testNamespace)
if err != nil {
b.Fatalf("Error when waiting for IP assignment of toolbox test Pod: %v", err)
}
return nginxPodIP, perfPodIP
}
// httpRequest runs a benchmark to measure intra-Node Pod-to-Pod HTTP request performance. It creates one toolbox
// Pod and one Nginx Pod, both on the control-plane Node. The toolbox will use apache-bench tool to issue perf.http.requests
// number of requests to the Nginx Pod. The number of concurrent requests will be determined by the value provided with
// the http.perf.concurrency command-line flag (default is 1, for sequential requests). policyRules indicates how many CIDR
// rules should be included in the network policy applied to the Pods.
func httpRequest(requests, policyRules int, data *TestData, b *testing.B) {
nginxPodIP, _ := setupTestPods(data, b)
// performance_test only runs in IPv4 cluster, so here only check the IPv4 address of nginx server Pod.
nginxPodIPStr := nginxPodIP.IPv4.String()
err := setupTestPodsConnection(data) // enable Pods connectivity policy first
if err != nil {
b.Fatalf("Error when adding network policy to set up connection between test Pods")
}
b.Log("Populating the workload network policy")
err = populateWorkloadNetworkPolicy(generateWorkloadNetworkPolicy(policyRules), data)
if err != nil {
b.Fatalf("Error when populating workload network policy: %v", err)
}
b.Log("Waiting for the workload network policy to be realized")
err = WaitNetworkPolicyRealize(controlPlaneNodeName(), openflow.IngressRuleTable, policyRules, data)
if err != nil {
b.Fatalf("Checking network policies realization failed: %v", err)
}
b.Log("Network policy realized")
serverURL := &url.URL{Scheme: "http", Host: nginxPodIPStr, Path: "/"}
b.StartTimer()
for i := 0; i < b.N; i++ {
b.Logf("Running http request bench %d/%d", i+1, b.N)
cmd := []string{"ab", "-n", fmt.Sprint(requests), "-c", fmt.Sprint(*httpConcurrency), serverURL.String()}
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, toolboxPodName, toolboxContainerName, cmd)
if err != nil {
b.Errorf("Error when running http request %dx: %v, stdout: %s, stderr: %s\n", requests, err, stdout, stderr)
}
}
}
// networkPolicyRealize runs a benchmark to measure how long it takes for a Network Policy with policyRules CIDR rules
// to be realized as OVS flows. In order to have entities for the Network Policy to be applied to, we create two dummy
// Pods with the "antrea-perf-test" app label, but they do not generate any traffic.
func networkPolicyRealize(policyRules int, data *TestData, b *testing.B) {
setupTestPods(data, b)
for i := 0; i < b.N; i++ {
go func() {
err := populateWorkloadNetworkPolicy(generateWorkloadNetworkPolicy(policyRules), data)
if err != nil {
// cannot use Fatal in a goroutine
// if populating policies fails, WaitNetworkPolicyRealize will
// eventually time out and the test will fail, although it would be
// better to fail early in that case.
b.Errorf("Error when populating workload network policy: %v", err)
}
}()
b.Log("Waiting for the network policy to be realized")
b.StartTimer()
err := WaitNetworkPolicyRealize(controlPlaneNodeName(), openflow.IngressRuleTable, policyRules, data)
if err != nil {
b.Fatalf("Checking network policies realization failed: %v", err)
}
b.StopTimer()
b.Log("Network policy realized")
err = data.clientset.NetworkingV1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), workloadNetworkPolicyName, metav1.DeleteOptions{})
if err != nil {
b.Fatalf("Error when cleaning up network policies after running one bench iteration: %v", err)
}
}
}
func WaitNetworkPolicyRealize(nodeName string, table *openflow.Table, policyRules int, data *TestData) error {
return wait.PollImmediate(50*time.Millisecond, *realizeTimeout, func() (bool, error) {
return checkRealize(nodeName, table, policyRules, data)
})
}
// checkRealize checks if all CIDR rules in the Network Policy have been realized as OVS flows. It counts the number of
// flows installed in the ingressRuleTable of the OVS bridge of the control-plane Node. This relies on the implementation
// knowledge that given a single ingress policy, the Antrea agent will install exactly one flow per CIDR rule in table
// IngressRule. checkRealize returns true when the number of flows exceeds the number of CIDR, because each table has a
// default flow entry which is used for default matching.
// Since the check is done over SSH, the time measurement is not completely accurate.
func checkRealize(nodeName string, table *openflow.Table, policyRules int, data *TestData) (bool, error) {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
return false, err
}
// table IngressRule is the ingressRuleTable where the rules in workload network policy is being applied to.
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%s", table.GetName())}
stdout, _, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, "antrea-agent", cmd)
if err != nil {
return false, err
}
flowNums := strings.Count(stdout, "\n")
return flowNums > policyRules, nil
}
// withPerfTestSetup runs function fn in a clean test environment.
// It ensures no stale flow rules in ovs and the bench timer is stopped and reset.
func withPerfTestSetup(fn func(data *TestData), b *testing.B) {
b.StopTimer()
b.ResetTimer()
defer b.StopTimer()
data, err := setupTest(b)
if err != nil {
b.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(b, data)
b.Logf("Deleting Antrea Agent DaemonSet to flush ovs cache")
if err := data.deleteAntrea(defaultTimeout); err != nil {
b.Fatalf("Error when deleting Antrea DaemonSet: %v", err)
}
b.Logf("Applying Antrea YAML")
if err := data.deployAntrea(deployAntreaDefault); err != nil {
b.Fatalf("Error when restarting Antrea: %v", err)
}
b.Logf("Waiting for all Antrea DaemonSet Pods")
if err := data.waitForAntreaDaemonSetPods(defaultTimeout); err != nil {
b.Fatalf("Error when restarting Antrea: %v", err)
}
fn(data)
}