Skip to content

Commit

Permalink
cmd: add testing tool for volume replication
Browse files Browse the repository at this point in the history
Signed-off-by: riya-singhal31 <[email protected]>
  • Loading branch information
riya-singhal31 committed Jul 28, 2023
1 parent a777675 commit 17450f2
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 1 deletion.
11 changes: 10 additions & 1 deletion cmd/csi-addons/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ $ kubectl exec -c csi-addons csi-backend-nodeplugin -- csi-addons -h
staging path (default "/var/lib/kubelet/plugins/kubernetes.io/csi/")
-version
print Version details
-volumeID string
id of the volume(Volume Handle)

The following operations are supported:
- ControllerReclaimSpace
Expand All @@ -38,8 +40,15 @@ The following operations are supported:
- Probe
- NetworkFence
- NetworkUnFence
- ControllerReclaimSpace
- EnableVolumeReplication
- DisableVolumeReplication
- PromoteVolume
- DemoteVolume
- ResyncVolume
- GetVolumeReplicationInfo
```

The above command assumes the running `csi-backend-nodeplugin` Pod has the
`quay.io/csiaddons/k8s-sidecar` as a container named `csi-addons`. Executing
the `csi-addons -h` command then shows the help text.
the `csi-addons -h` command then shows the help text.
2 changes: 2 additions & 0 deletions cmd/csi-addons/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type command struct {
cidrs string
clusterid string
legacy bool
volumeID string
}

// cmd is the single instance of the command struct, used inside main().
Expand All @@ -64,6 +65,7 @@ func init() {
flag.StringVar(&cmd.clusterid, "clusterid", "", "clusterID")
flag.BoolVar(&cmd.legacy, "legacy", false, "use legacy format for old Kubernetes versions")
flag.BoolVar(&showVersion, "version", false, "print Version details")
flag.StringVar(&cmd.volumeID, "volumeID", "", "ID of the volume(Volume Handle)")

// output to show when --help is passed
flag.Usage = func() {
Expand Down
239 changes: 239 additions & 0 deletions cmd/csi-addons/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
Copyright 2023 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"strings"

"github.com/csi-addons/kubernetes-csi-addons/internal/proto"
"github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service"
)

type VolumeReplicationBase struct {
// inherit Connect() and Close() from type grpcClient
grpcClient

parameters map[string]string
secretName string
secretNamespace string
volumeID string
}

func verifyParameters(clusterID string, secretName string, secretNamespace string, volumeID string) error {
if clusterID == "" {
return fmt.Errorf("clusterID not set")
}

if secretName == "" {
return fmt.Errorf("secret name is not set")
}

if secretNamespace == "" {
return fmt.Errorf("secret namespace is not set")
}

if volumeID == "" {
return fmt.Errorf("volume ID is not set")
}

return nil
}

func (rep *VolumeReplicationBase) Init(c *command) error {
rep.parameters = make(map[string]string)
rep.parameters["clusterID"] = c.clusterid

secrets := strings.Split(c.secret, "/")
if len(secrets) != 2 {
return fmt.Errorf("secret should be specified in the format `namespace/name`")
}
rep.secretNamespace = secrets[0]
rep.secretName = secrets[1]
rep.volumeID = c.volumeID

return verifyParameters(rep.parameters["clusterID"], rep.secretName, rep.secretNamespace, rep.volumeID)
}

// EnableVolumeReplication executes the EnableVolumeReplication operation.
type EnableVolumeReplication struct {
VolumeReplicationBase
}

var _ = registerOperation("EnableVolumeReplication", &EnableVolumeReplication{})

func (rep *EnableVolumeReplication) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.EnableVolumeReplicationRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.EnableVolumeReplication(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("Enable Volume Replication operation successful")

return nil
}

// DisableVolumeReplication executes the DisableVolumeReplication operation.
type DisableVolumeReplication struct {
VolumeReplicationBase
}

var _ = registerOperation("DisableVolumeReplication", &DisableVolumeReplication{})

func (rep *DisableVolumeReplication) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.DisableVolumeReplicationRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.DisableVolumeReplication(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("Disable Volume Replication operation successful")

return nil
}

// PromoteVolume executes the PromoteVolume operation.
type PromoteVolume struct {
VolumeReplicationBase
}

var _ = registerOperation("PromoteVolume", &PromoteVolume{})

func (rep *PromoteVolume) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.PromoteVolumeRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.PromoteVolume(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("Promote Volume operation successful")

return nil
}

// DemoteVolume executes the DemoteVolume operation.
type DemoteVolume struct {
VolumeReplicationBase
}

var _ = registerOperation("DemoteVolume", &DemoteVolume{})

func (rep *DemoteVolume) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.DemoteVolumeRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.DemoteVolume(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("Demote Volume operation successful")

return nil
}

// ResyncVolume executes the ResyncVolume operation.
type ResyncVolume struct {
VolumeReplicationBase
}

var _ = registerOperation("ResyncVolume", &ResyncVolume{})

func (rep *ResyncVolume) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.ResyncVolumeRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.ResyncVolume(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("Resync Volume operation successful")

return nil
}

// GetVolumeReplicationInfo executes the GetVolumeReplicationInfo operation.
type GetVolumeReplicationInfo struct {
VolumeReplicationBase
}

var _ = registerOperation("GetVolumeReplicationInfo", &GetVolumeReplicationInfo{})

func (rep *GetVolumeReplicationInfo) Execute() error {
k := getKubernetesClient()

rs := service.NewReplicationServer(rep.Client, k)

req := &proto.GetVolumeReplicationInfoRequest{
SecretName: rep.secretName,
SecretNamespace: rep.secretNamespace,
VolumeId: rep.volumeID,
}

_, err := rs.GetVolumeReplicationInfo(context.TODO(), req)
if err != nil {
return err
}

fmt.Printf("GetVolumeReplicationInfo operation successful")

return nil
}

0 comments on commit 17450f2

Please sign in to comment.