Skip to content

Commit

Permalink
disk: implement ModifyVolume
Browse files Browse the repository at this point in the history
  • Loading branch information
huww98 committed Dec 19, 2024
1 parent 762a76a commit 6d4c6ef
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/ram-policies/disk/controller.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
"ecs:DescribeSnapshots",
"ecs:DescribeTags",
"ecs:DescribeTaskAttribute",
"ecs:DescribeTasks",
"ecs:DetachDisk",
"ecs:ListTagResources",
"ecs:ModifyDiskAttribute",
"ecs:ModifyDiskSpec",
"ecs:RemoveTags",
"ecs:ResizeDisk",
Expand Down
6 changes: 6 additions & 0 deletions pkg/cloud/ecsinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@ type ECSInterface interface {
DescribeDisks(request *ecs.DescribeDisksRequest) (response *ecs.DescribeDisksResponse, err error)
ResizeDisk(request *ecs.ResizeDiskRequest) (response *ecs.ResizeDiskResponse, err error)
DescribeSnapshots(request *ecs.DescribeSnapshotsRequest) (response *ecs.DescribeSnapshotsResponse, err error)

TagResources(request *ecs.TagResourcesRequest) (response *ecs.TagResourcesResponse, err error)
UntagResources(request *ecs.UntagResourcesRequest) (response *ecs.UntagResourcesResponse, err error)
ModifyDiskSpec(request *ecs.ModifyDiskSpecRequest) (response *ecs.ModifyDiskSpecResponse, err error)
ModifyDiskAttribute(request *ecs.ModifyDiskAttributeRequest) (response *ecs.ModifyDiskAttributeResponse, err error)
DescribeTasks(request *ecs.DescribeTasksRequest) (response *ecs.DescribeTasksResponse, err error)
}
75 changes: 75 additions & 0 deletions pkg/cloud/ecsmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions pkg/disk/constants.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package disk

const (

// ESSD_PERFORMANCE_LEVEL is storage class
ESSD_PERFORMANCE_LEVEL = "performanceLevel"
// DISKTAGKEY1 tag
DISKTAGKEY1 = "k8s.aliyun.com"
// DISKTAGVALUE1 value
Expand All @@ -18,6 +15,16 @@ const (
SNAPSHOTTAGKEY1 = "force.delete.snapshot.k8s.aliyun.com"
)

// keys used in CreateVolumeRequest.Parameters and MutableParameters
const (
DISK_TYPE = "type"
ESSD_PERFORMANCE_LEVEL = "performanceLevel"
PROVISIONED_IOPS_KEY = "provisionedIops"
BURSTING_ENABLED_KEY = "burstingEnabled"
DISK_TAG_PREFIX = "diskTags/"
REMOVE_DISK_TAG_PREFIX = "-diskTags/"
)

// keys used in CreateSnapshotRequest.Parameters
const (
SNAPSHOTTYPE = "snapshotType"
Expand Down Expand Up @@ -128,9 +135,6 @@ const (
VOLUME_EXPAND_AUTO_SNAPSHOT_OP_KEY = "volumeExpandAutoSnapshot"
VOLUME_DELETE_AUTO_SNAPSHOT_OP_RETENT_DAYS_KEY = "volumeDeleteSnapshotRetentionDays"

PROVISIONED_IOPS_KEY = "provisionedIops"
BURSTING_ENABLED_KEY = "burstingEnabled"

CSI_DEFAULT_FS_TYPE = "csi.storage.k8s.io/fstype"
FS_TYPE = "fsType"
EXT4_FSTYPE = "ext4"
Expand Down
43 changes: 43 additions & 0 deletions pkg/disk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"google.golang.org/grpc/codes"
Expand All @@ -42,11 +45,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

// controller server try to create/delete volumes/snapshots
type controllerServer struct {
recorder record.EventRecorder
modify ModifyServer
common.GenericControllerServer
}

Expand Down Expand Up @@ -80,10 +85,24 @@ var veasp = struct {

var delVolumeSnap sync.Map

func newTaskStatusWaiter() waitstatus.StatusWaiter[ecs.Task] {
client := desc.Task{Client: GlobalConfigVar.EcsClient}
waiter := waitstatus.NewBatched(client, clock.RealClock{}, 3*time.Second, 10*time.Second)
waiter.PollHook = func() desc.Client[ecs.Task] {
return desc.Task{Client: updateEcsClient(GlobalConfigVar.EcsClient)}
}
go waiter.Run(context.Background())
return waiter
}

// NewControllerServer is to create controller server
func NewControllerServer() csi.ControllerServer {
c := &controllerServer{
recorder: utils.NewEventRecorder(),
modify: ModifyServer{
ecsClient: GlobalConfigVar.EcsClient,
taskWaiter: newTaskStatusWaiter(),
},
}
return c
}
Expand All @@ -96,6 +115,7 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
),
}, nil
}
Expand Down Expand Up @@ -136,6 +156,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err)
}

if len(req.MutableParameters) > 0 {
mutable, err := parseMutableParameters(req.MutableParameters)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid mutable parameters: %v", err)
}
importMutableParameters(diskVol, &mutable)
}

// 兼容 serverless 拓扑感知场景;
// req参数里面包含了云盘ID,则直接使用云盘ID进行返回;
csiVolume, err := staticVolumeCreate(req, snapshotID)
Expand Down Expand Up @@ -970,3 +998,18 @@ func (cs *controllerServer) deleteUntagAutoSnapshot(snapshotID, diskID string) {
klog.Errorf("ControllerExpandVolume:: failed to untag volumeExpandAutoSnapshot: %s", err.Error())
}
}

func (cs *controllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
params, err := parseMutableParameters(req.MutableParameters)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
err = cs.modify.Modify(ctx, req.VolumeId, params)
if err != nil {
if errors.Is(err, wrap.ErrorCode("InvalidDiskId.NotFound")) {
return nil, status.Error(codes.NotFound, err.Error())
}
return nil, err
}
return &csi.ControllerModifyVolumeResponse{}, nil
}
43 changes: 43 additions & 0 deletions pkg/disk/desc/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package desc

import (
"strings"

"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
)

type Task struct {
*ecs.Client
}

func (c Task) Describe(ids []string) (Response[ecs.Task], error) {
req := ecs.CreateDescribeTasksRequest()
req.TaskIds = strings.Join(ids, ",")
req.PageSize = requests.NewInteger(batchSize)

ret := Response[ecs.Task]{}
resp, err := c.Client.DescribeTasks(req)
if err != nil {
return ret, err
}
ret.RequestID = resp.RequestId
ret.Resources = resp.TaskSet.Task
return ret, nil
}

func (c Task) GetID(resource *ecs.Task) string {
return resource.TaskId
}

func (c Task) Type() string {
return "task"
}

func (c Task) BatchSize() int {
return batchSize
}

func TaskSattled(t *ecs.Task) bool {
return t.TaskStatus == "Finished" || t.TaskStatus == "Failed"
}
Loading

0 comments on commit 6d4c6ef

Please sign in to comment.