Skip to content

Commit

Permalink
Add multicluster support
Browse files Browse the repository at this point in the history
This commit introduces multicluster support, which allows to specify
separate K8s contexts for the iperf3 client and server.

An example usage is:
```
k8s-iperf run --k8s-server-context kind-1 --k8s-client-context kind-2 --k8s-multicluster true --k8s-service-annotation "service.cilium.io/global: true"
```

This is using Cilium Cluster Mesh to test cross cluster throughput.

Signed-off-by: darox <[email protected]>
  • Loading branch information
darox committed Oct 11, 2024
1 parent 668ec0f commit 2b1e9fb
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 58 deletions.
33 changes: 24 additions & 9 deletions pkg/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ var runCmd = &cobra.Command{
clientNode, _ := cmd.Flags().GetString("k8s-client-node")
domain, _ := cmd.Flags().GetString("k8s-domain")
cleanup, _ := cmd.Flags().GetBool("k8s-cleanup")
serverContext, _ := cmd.Flags().GetString("k8s-server-context")
clientContext, _ := cmd.Flags().GetString("k8s-client-context")
multiCluster, _ := cmd.Flags().GetBool("k8s-multi-cluster")
serviceAnnotation, _ := cmd.Flags().GetString("k8s-service-annotation")
serverClientSet, err := k8s.NewClient(serverContext)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

client, err := k8s.NewClient()
clientClientSet, err := k8s.NewClient(clientContext)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
Expand All @@ -38,14 +46,17 @@ var runCmd = &cobra.Command{
}

config := iperf.TestConfig{
Client: client,
Namespace: namespace,
Image: image,
IperfArgs: iperfArgs,
ServerNode: serverNode,
ClientNode: clientNode,
Domain: domain,
Cleanup: cleanup,
ClientClientSet: clientClientSet,
ServerClientSet: serverClientSet,
Namespace: namespace,
Image: image,
IperfArgs: iperfArgs,
ServerNode: serverNode,
ClientNode: clientNode,
Domain: domain,
Cleanup: cleanup,
MultiCluster: multiCluster,
ServiceAnnotation: serviceAnnotation,
}

return iperf.RunTest(config)
Expand All @@ -59,6 +70,10 @@ func init() {
runCmd.Flags().StringP("k8s-client-node", "", "", "Client node to use for the test")
runCmd.Flags().StringP("k8s-domain", "", "cluster.local", "Kubernetes domain to use for the test")
runCmd.Flags().BoolP("k8s-cleanup", "", true, "Cleanup resources after the test")
runCmd.Flags().StringP("k8s-server-context", "", "", "Kubernetes server context to use for the test")
runCmd.Flags().StringP("k8s-client-context", "", "", "Kubernetes client context to use for the test")
runCmd.Flags().BoolP("k8s-multi-cluster", "", false, "Run the test in multi-cluster mode")
runCmd.Flags().StringP("k8s-service-annotation", "", "", "Service annotation to use for the test")
rootCmd.AddCommand(runCmd)
}

Expand Down
158 changes: 111 additions & 47 deletions pkg/iperf/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/fatih/color"
)

// At the package level, add this static error:
// Used for Cilium Cluster Mesh to test across two different clusters

var errIperf3ClientPodFailed = errors.New("iperf3 client pod failed")

// Add static error variables
var (
errEmptyLogs = errors.New("iperf3 client returned empty logs")
errMissingStartField = errors.New("missing or invalid 'start' field in JSON data")
Expand All @@ -34,16 +34,21 @@ var (
errInvalidConnectedData = errors.New("invalid 'connected' data structure in JSON data")
)

var errCleanup = errors.New("cleanup errors occurred")

// TestConfig holds the configuration for the iperf3 test
type TestConfig struct {
Client *kubernetes.Clientset
Namespace string
Domain string
Image string
IperfArgs []string
ServerNode string
ClientNode string
Cleanup bool
ServerClientSet *kubernetes.Clientset
ClientClientSet *kubernetes.Clientset
Namespace string
Domain string
Image string
IperfArgs []string
ServerNode string
ClientNode string
Cleanup bool
MultiCluster bool
ServiceAnnotation string
}

func RunTest(config TestConfig) error {
Expand All @@ -66,18 +71,18 @@ func RunTest(config TestConfig) error {
case <-sigChan:
fmt.Println("Received interrupt signal, cleaning up resources...")
cancel() // Cancel the context to stop ongoing operations
return cleanup(config.Client, config.Namespace)
return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster)
case <-ctx.Done():
fmt.Println("Test cancelled, cleaning up resources...")
return cleanup(config.Client, config.Namespace)
return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster)
}
}

func runTestInternal(ctx context.Context, config TestConfig) error {
// Defer cleanup if Cleanup is set to true
if config.Cleanup {
defer func() {
if err := cleanup(config.Client, config.Namespace); err != nil {
if err := cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster); err != nil {
fmt.Printf("Failed to cleanup resources: %v\n", err)
}
}()
Expand All @@ -88,23 +93,23 @@ func runTestInternal(ctx context.Context, config TestConfig) error {
color.Red("✘ Failed to deploy iperf3 server: %v", err)
return fmt.Errorf("failed to deploy iperf3 server: %w", err)
}
color.Green("✔ iperf3 server deployed successfully")
color.Green("✔ iperf3 server created successfully")

// Create service for iperf3 server
if err := createIperf3Service(config.Client, config.Namespace); err != nil {
if err := createIperf3Service(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster, config.ServiceAnnotation); err != nil {
color.Red("✘ Failed to create iperf3 service: %v", err)
return fmt.Errorf("failed to create iperf3 service: %w", err)
}
color.Green("✔ iperf3 service created successfully")

// Wait for server to be ready
if err := waitForDeploymentReady(config.Client, config.Namespace, "iperf3-server", 60*time.Second); err != nil {
if err := waitForDeploymentReady(config.ServerClientSet, config.Namespace, "iperf3-server", 60*time.Second); err != nil {
color.Red("✘ iperf3 server failed to become ready: %v", err)
return fmt.Errorf("iperf3 server failed to become ready: %w", err)
}

// Add this: Wait for the iperf3 server pod to be ready
if err := waitForPodReady(ctx, config.Client, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil {
// Wait for the iperf3 server pod to be ready
if err := waitForPodReady(ctx, config.ServerClientSet, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil {
color.Red("✘ iperf3 server pod failed to become ready: %v", err)
return fmt.Errorf("iperf3 server pod failed to become ready: %w", err)
}
Expand Down Expand Up @@ -185,11 +190,11 @@ func deployIperf3Server(config TestConfig) error {
}
}

_, err := config.Client.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
_, err := config.ServerClientSet.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
return err
}

func createIperf3Service(client *kubernetes.Clientset, namespace string) error {
func createIperf3Service(clientSetServer *kubernetes.Clientset, clientSetClient *kubernetes.Clientset, namespace string, multiCluster bool, serviceAnnotation string) error {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "iperf3-server",
Expand All @@ -210,7 +215,31 @@ func createIperf3Service(client *kubernetes.Clientset, namespace string) error {
},
}

_, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
// signature of serviceAnnotation is key1=value1,key2=value2. This is useful for adding the Cilium Cluster Mesh annotation
if serviceAnnotation != "" {

// init map for annotations
service.Annotations = make(map[string]string)

for _, v := range strings.Split(serviceAnnotation, ",") {
kv := strings.Split(v, "=")
if len(kv) == 2 {
service.Annotations[kv[0]] = kv[1]
}
}
}

if multiCluster {
for _, client := range []*kubernetes.Clientset{clientSetServer, clientSetClient} {
_, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service in cluster: %w", err)
}
}
return nil
}

_, err := clientSetServer.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
return err
}

Expand Down Expand Up @@ -255,12 +284,12 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {
}
}

_, err := config.Client.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
_, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return err
}

color.Green("✔ iperf3 client pod created successfully")
color.Green("✔ iperf3 client created successfully")

color.Green("► Starting iperf3 test")

Expand All @@ -271,7 +300,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {
case <-ctx.Done():
return ctx.Err()
default:
pod, err := config.Client.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{})
pod, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -287,7 +316,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {

PodCompleted:
// Get and print the logs
logs, err := config.Client.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw()
logs, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw()
if err != nil {
return err
}
Expand Down Expand Up @@ -352,29 +381,29 @@ func printIperfSummary(jsonData []byte) error {

// Prepare all lines
lines := []string{
fmt.Sprintf("Connection Details:"),
fmt.Sprintf(" Local: %s:%d", connectedMap["local_host"], int(connectedMap["local_port"].(float64))),
fmt.Sprintf(" Remote: %s:%d", connectedMap["remote_host"], int(connectedMap["remote_port"].(float64))),
fmt.Sprintf(""),
fmt.Sprintf("Test Configuration:"),
fmt.Sprintf(" Protocol: %s", testStart["protocol"]),
fmt.Sprintf(" Duration: %.2f seconds", sumSent["seconds"]),
fmt.Sprintf(" Parallel Streams: %d", int(testStart["num_streams"].(float64))),
fmt.Sprintf(""),
fmt.Sprintf("Results:"),
fmt.Sprintf(" Sent: %.2f Mbits/sec", sumSent["bits_per_second"].(float64)/1e6),
fmt.Sprintf(" Received: %.2f Mbits/sec", sumReceived["bits_per_second"].(float64)/1e6),
"Connection Details:",
fmt.Sprintf(" Local: %s:%d", getStringValue(connectedMap, "local_host"), getIntValue(connectedMap, "local_port")),
fmt.Sprintf(" Remote: %s:%d", getStringValue(connectedMap, "remote_host"), getIntValue(connectedMap, "remote_port")),
"",
"Test Configuration:",
fmt.Sprintf(" Protocol: %s", getStringValue(testStart, "protocol")),
fmt.Sprintf(" Duration: %.2f seconds", getFloatValue(sumSent, "seconds")),
fmt.Sprintf(" Parallel Streams: %d", getIntValue(testStart, "num_streams")),
"",
"Results:",
fmt.Sprintf(" Sent: %.2f Mbits/sec", getFloatValue(sumSent, "bits_per_second")/1e6),
fmt.Sprintf(" Received: %.2f Mbits/sec", getFloatValue(sumReceived, "bits_per_second")/1e6),
}

if retransmits, ok := sumSent["retransmits"]; ok {
lines = append(lines, fmt.Sprintf(" Retransmits: %d", int(retransmits.(float64))))
}

lines = append(lines,
fmt.Sprintf(""),
fmt.Sprintf("CPU Utilization:"),
fmt.Sprintf(" Local: %.2f%%", cpuUtil["host_total"].(float64)),
fmt.Sprintf(" Remote: %.2f%%", cpuUtil["remote_total"].(float64)),
"",
"CPU Utilization:",
fmt.Sprintf(" Local: %.2f%%", getFloatValue(cpuUtil, "host_total")),
fmt.Sprintf(" Remote: %.2f%%", getFloatValue(cpuUtil, "remote_total")),
)

// Calculate the length of the longest line
Expand Down Expand Up @@ -403,16 +432,30 @@ func printIperfSummary(jsonData []byte) error {
return nil
}

func cleanup(client *kubernetes.Clientset, namespace string) error {
if err := client.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
return err
func cleanup(serverClientSet, clientClientSet *kubernetes.Clientset, namespace string, multiCluster bool) error {
var errs []error

if err := serverClientSet.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-server deployment: %w", err))
}

if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
return err
if err := clientClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-client pod: %w", err))
}

return client.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{})
if multiCluster {
for _, client := range []*kubernetes.Clientset{serverClientSet, clientClientSet} {
if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-server service in cluster: %w", err))
}
}
}

if len(errs) > 0 {
return fmt.Errorf("%w: %v", errCleanup, errs)
}

return nil
}

func int32Ptr(i int32) *int32 {
Expand Down Expand Up @@ -463,3 +506,24 @@ func waitForPodReady(ctx context.Context, client *kubernetes.Clientset, namespac
return false, nil
})
}

func getStringValue(m map[string]interface{}, key string) string {
if value, ok := m[key].(string); ok {
return value
}
return "N/A"
}

func getIntValue(m map[string]interface{}, key string) int {
if value, ok := m[key].(float64); ok {
return int(value)
}
return 0
}

func getFloatValue(m map[string]interface{}, key string) float64 {
if value, ok := m[key].(float64); ok {
return value
}
return 0.0
}
19 changes: 17 additions & 2 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,29 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

func NewClient() (*kubernetes.Clientset, error) {
func NewClient(context string) (*kubernetes.Clientset, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, err
}

kubeconfig := filepath.Join(homeDir, ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)

// Create a config loading rule that prefers the provided context
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = kubeconfig

// Create a config overrides struct and set the context if provided
overrides := &clientcmd.ConfigOverrides{}
if context != "" {
overrides.CurrentContext = context
}

// Create a client config using the loading rules and overrides
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)

// Get the rest config
config, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 2b1e9fb

Please sign in to comment.