From f751cd66a0590acf8ceee13548bfdb8600b6f0f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Tue, 3 Dec 2024 14:13:09 +0800 Subject: [PATCH] disk: implement ModifyVolume --- docs/ram-policies/disk/controller.json | 2 + pkg/cloud/ecsinterface.go | 6 + pkg/cloud/ecsmock.go | 75 +++++++++ pkg/disk/constants.go | 16 +- pkg/disk/controllerserver.go | 43 ++++++ pkg/disk/desc/task.go | 43 ++++++ pkg/disk/modify.go | 206 +++++++++++++++++++++++++ pkg/disk/utils.go | 68 +++++++- 8 files changed, 448 insertions(+), 11 deletions(-) create mode 100644 pkg/disk/desc/task.go create mode 100644 pkg/disk/modify.go diff --git a/docs/ram-policies/disk/controller.json b/docs/ram-policies/disk/controller.json index 726f54b39..b885f189c 100644 --- a/docs/ram-policies/disk/controller.json +++ b/docs/ram-policies/disk/controller.json @@ -20,8 +20,10 @@ "ecs:DescribeSnapshots", "ecs:DescribeTags", "ecs:DescribeTaskAttribute", + "ecs:DescribeTasks", "ecs:DetachDisk", "ecs:ListTagResources", + "ecs:ModifyDiskAttribute", "ecs:ModifyDiskSpec", "ecs:RemoveTags", "ecs:ResizeDisk", diff --git a/pkg/cloud/ecsinterface.go b/pkg/cloud/ecsinterface.go index 2d5d0dfab..41d3fa2c9 100644 --- a/pkg/cloud/ecsinterface.go +++ b/pkg/cloud/ecsinterface.go @@ -11,4 +11,10 @@ type ECSInterface interface { DescribeDisks(request *ecs.DescribeDisksRequest) (response *ecs.DescribeDisksResponse, err error) ResizeDisk(request *ecs.ResizeDiskRequest) (response *ecs.ResizeDiskResponse, err error) DescribeSnapshots(request *ecs.DescribeSnapshotsRequest) (response *ecs.DescribeSnapshotsResponse, err error) + + TagResources(request *ecs.TagResourcesRequest) (response *ecs.TagResourcesResponse, err error) + UntagResources(request *ecs.UntagResourcesRequest) (response *ecs.UntagResourcesResponse, err error) + ModifyDiskSpec(request *ecs.ModifyDiskSpecRequest) (response *ecs.ModifyDiskSpecResponse, err error) + ModifyDiskAttribute(request *ecs.ModifyDiskAttributeRequest) (response *ecs.ModifyDiskAttributeResponse, err error) + DescribeTasks(request *ecs.DescribeTasksRequest) (response *ecs.DescribeTasksResponse, err error) } diff --git a/pkg/cloud/ecsmock.go b/pkg/cloud/ecsmock.go index 2f309d925..dda3cc21f 100644 --- a/pkg/cloud/ecsmock.go +++ b/pkg/cloud/ecsmock.go @@ -139,6 +139,51 @@ func (mr *MockECSInterfaceMockRecorder) DescribeSnapshots(request interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeSnapshots", reflect.TypeOf((*MockECSInterface)(nil).DescribeSnapshots), request) } +// DescribeTasks mocks base method. +func (m *MockECSInterface) DescribeTasks(request *ecs.DescribeTasksRequest) (*ecs.DescribeTasksResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeTasks", request) + ret0, _ := ret[0].(*ecs.DescribeTasksResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeTasks indicates an expected call of DescribeTasks. +func (mr *MockECSInterfaceMockRecorder) DescribeTasks(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeTasks", reflect.TypeOf((*MockECSInterface)(nil).DescribeTasks), request) +} + +// ModifyDiskAttribute mocks base method. +func (m *MockECSInterface) ModifyDiskAttribute(request *ecs.ModifyDiskAttributeRequest) (*ecs.ModifyDiskAttributeResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyDiskAttribute", request) + ret0, _ := ret[0].(*ecs.ModifyDiskAttributeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ModifyDiskAttribute indicates an expected call of ModifyDiskAttribute. +func (mr *MockECSInterfaceMockRecorder) ModifyDiskAttribute(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyDiskAttribute", reflect.TypeOf((*MockECSInterface)(nil).ModifyDiskAttribute), request) +} + +// ModifyDiskSpec mocks base method. +func (m *MockECSInterface) ModifyDiskSpec(request *ecs.ModifyDiskSpecRequest) (*ecs.ModifyDiskSpecResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyDiskSpec", request) + ret0, _ := ret[0].(*ecs.ModifyDiskSpecResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ModifyDiskSpec indicates an expected call of ModifyDiskSpec. +func (mr *MockECSInterfaceMockRecorder) ModifyDiskSpec(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyDiskSpec", reflect.TypeOf((*MockECSInterface)(nil).ModifyDiskSpec), request) +} + // ResizeDisk mocks base method. func (m *MockECSInterface) ResizeDisk(request *ecs.ResizeDiskRequest) (*ecs.ResizeDiskResponse, error) { m.ctrl.T.Helper() @@ -153,3 +198,33 @@ func (mr *MockECSInterfaceMockRecorder) ResizeDisk(request interface{}) *gomock. mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResizeDisk", reflect.TypeOf((*MockECSInterface)(nil).ResizeDisk), request) } + +// TagResources mocks base method. +func (m *MockECSInterface) TagResources(request *ecs.TagResourcesRequest) (*ecs.TagResourcesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TagResources", request) + ret0, _ := ret[0].(*ecs.TagResourcesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TagResources indicates an expected call of TagResources. +func (mr *MockECSInterfaceMockRecorder) TagResources(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TagResources", reflect.TypeOf((*MockECSInterface)(nil).TagResources), request) +} + +// UntagResources mocks base method. +func (m *MockECSInterface) UntagResources(request *ecs.UntagResourcesRequest) (*ecs.UntagResourcesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UntagResources", request) + ret0, _ := ret[0].(*ecs.UntagResourcesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UntagResources indicates an expected call of UntagResources. +func (mr *MockECSInterfaceMockRecorder) UntagResources(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UntagResources", reflect.TypeOf((*MockECSInterface)(nil).UntagResources), request) +} diff --git a/pkg/disk/constants.go b/pkg/disk/constants.go index aaa8eb2f5..c8dffe28a 100644 --- a/pkg/disk/constants.go +++ b/pkg/disk/constants.go @@ -1,9 +1,6 @@ package disk const ( - - // ESSD_PERFORMANCE_LEVEL is storage class - ESSD_PERFORMANCE_LEVEL = "performanceLevel" // DISKTAGKEY1 tag DISKTAGKEY1 = "k8s.aliyun.com" // DISKTAGVALUE1 value @@ -18,6 +15,16 @@ const ( SNAPSHOTTAGKEY1 = "force.delete.snapshot.k8s.aliyun.com" ) +// keys used in CreateVolumeRequest.Parameters and MutableParameters +const ( + DISK_TYPE = "type" + ESSD_PERFORMANCE_LEVEL = "performanceLevel" + PROVISIONED_IOPS_KEY = "provisionedIops" + BURSTING_ENABLED_KEY = "burstingEnabled" + DISK_TAG_PREFIX = "diskTags/" + REMOVE_DISK_TAG_PREFIX = "-diskTags/" +) + // keys used in CreateSnapshotRequest.Parameters const ( SNAPSHOTTYPE = "snapshotType" @@ -128,9 +135,6 @@ const ( VOLUME_EXPAND_AUTO_SNAPSHOT_OP_KEY = "volumeExpandAutoSnapshot" VOLUME_DELETE_AUTO_SNAPSHOT_OP_RETENT_DAYS_KEY = "volumeDeleteSnapshotRetentionDays" - PROVISIONED_IOPS_KEY = "provisionedIops" - BURSTING_ENABLED_KEY = "burstingEnabled" - CSI_DEFAULT_FS_TYPE = "csi.storage.k8s.io/fstype" FS_TYPE = "fsType" EXT4_FSTYPE = "ext4" diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index 4a64a6198..605293ed6 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -31,7 +31,10 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes/timestamp" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" "google.golang.org/grpc/codes" @@ -43,11 +46,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/clock" ) // controller server try to create/delete volumes/snapshots type controllerServer struct { recorder record.EventRecorder + modify ModifyServer common.GenericControllerServer } @@ -81,10 +86,24 @@ var veasp = struct { var delVolumeSnap sync.Map +func newTaskStatusWaiter() waitstatus.StatusWaiter[ecs.Task] { + client := desc.Task{Client: GlobalConfigVar.EcsClient} + waiter := waitstatus.NewBatched(client, clock.RealClock{}, 3*time.Second, 10*time.Second) + waiter.PollHook = func() desc.Client[ecs.Task] { + return desc.Task{Client: updateEcsClient(GlobalConfigVar.EcsClient)} + } + go waiter.Run(context.Background()) + return waiter +} + // NewControllerServer is to create controller server func NewControllerServer() csi.ControllerServer { c := &controllerServer{ recorder: utils.NewEventRecorder(), + modify: ModifyServer{ + ecsClient: GlobalConfigVar.EcsClient, + taskWaiter: newTaskStatusWaiter(), + }, } return c } @@ -97,6 +116,7 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req * csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, ), }, nil } @@ -137,6 +157,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Errorf(codes.InvalidArgument, "Invalid parameters from input: %v, with error: %v", req.Name, err) } + if len(req.MutableParameters) > 0 { + mutable, err := parseMutableParameters(req.MutableParameters) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid mutable parameters: %v", err) + } + importMutableParameters(diskVol, &mutable) + } + // 兼容 serverless 拓扑感知场景; // req参数里面包含了云盘ID,则直接使用云盘ID进行返回; csiVolume, err := staticVolumeCreate(req, snapshotID) @@ -971,3 +999,18 @@ func (cs *controllerServer) deleteUntagAutoSnapshot(snapshotID, diskID string) { klog.Errorf("ControllerExpandVolume:: failed to untag volumeExpandAutoSnapshot: %s", err.Error()) } } + +func (cs *controllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { + params, err := parseMutableParameters(req.MutableParameters) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + err = cs.modify.Modify(ctx, req.VolumeId, params) + if err != nil { + if errors.Is(err, wrap.ErrorCode("InvalidDiskId.NotFound")) { + return nil, status.Error(codes.NotFound, err.Error()) + } + return nil, err + } + return &csi.ControllerModifyVolumeResponse{}, nil +} diff --git a/pkg/disk/desc/task.go b/pkg/disk/desc/task.go new file mode 100644 index 000000000..bf3e50ec8 --- /dev/null +++ b/pkg/disk/desc/task.go @@ -0,0 +1,43 @@ +package desc + +import ( + "strings" + + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" +) + +type Task struct { + *ecs.Client +} + +func (c Task) Describe(ids []string) (Response[ecs.Task], error) { + req := ecs.CreateDescribeTasksRequest() + req.TaskIds = strings.Join(ids, ",") + req.PageSize = requests.NewInteger(batchSize) + + ret := Response[ecs.Task]{} + resp, err := c.Client.DescribeTasks(req) + if err != nil { + return ret, err + } + ret.RequestID = resp.RequestId + ret.Resources = resp.TaskSet.Task + return ret, nil +} + +func (c Task) GetID(resource *ecs.Task) string { + return resource.TaskId +} + +func (c Task) Type() string { + return "task" +} + +func (c Task) BatchSize() int { + return batchSize +} + +func TaskSattled(t *ecs.Task) bool { + return t.TaskStatus == "Finished" || t.TaskStatus == "Failed" +} diff --git a/pkg/disk/modify.go b/pkg/disk/modify.go new file mode 100644 index 000000000..259f097f5 --- /dev/null +++ b/pkg/disk/modify.go @@ -0,0 +1,206 @@ +package disk + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" + "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" + "github.com/go-logr/logr" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +type ModifyParameters struct { + Category Category + PerformanceLevel PerformanceLevel + ProvisionedIops *int + BurstingEnabled *bool + Tags []ecs.TagResourcesTag + RemoveTags []string +} + +type ModifyServer struct { + ecsClient cloud.ECSInterface + taskWaiter waitstatus.StatusWaiter[ecs.Task] + runningTaskIDs sync.Map +} + +type errTaskNotFinished struct { + TaskID string +} + +func (e errTaskNotFinished) Error() string { + return fmt.Sprintf("task %s is not finished yet", e.TaskID) +} + +func (errTaskNotFinished) Is(target error) bool { + return target == context.DeadlineExceeded +} + +func (m *ModifyServer) waitForTask(ctx context.Context, logger logr.Logger, taskID string) (*ecs.Task, error) { + task, err := m.taskWaiter.WaitFor(ctx, taskID, desc.TaskSattled) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return task, errTaskNotFinished{taskID} + } + return task, fmt.Errorf("error while waiting for task %s to finish: %w", taskID, err) + } + logger.V(2).Info("task finished", "task", task.TaskId, "status", task.TaskStatus, "finishedTime", task.FinishedTime) + return task, nil +} + +func (m *ModifyServer) retrieveTask(logger logr.Logger, diskID string) (*ecs.Task, error) { + req := ecs.CreateDescribeTasksRequest() + req.PageSize = requests.NewInteger(1) + req.ResourceIds = ptr.To([]string{diskID}) + req.TaskAction = "ModifyDiskSpec" + + resp, err := wrap.V1(logger, m.ecsClient.DescribeTasks)(req) + if err != nil { + return nil, err + } + var task *ecs.Task + if len(resp.TaskSet.Task) > 0 { + task = &resp.TaskSet.Task[0] + } + return task, nil +} + +func (m *ModifyServer) modifyDiskSpec(ctx context.Context, logger logr.Logger, diskID, category, performanceLevel string, ProvisionedIops *int) error { + req := ecs.CreateModifyDiskSpecRequest() + req.DiskId = diskID + req.DiskCategory = category + req.PerformanceLevel = performanceLevel + if ProvisionedIops != nil { + req.ProvisionedIops = requests.NewInteger(*ProvisionedIops) + } + + resp, err := wrap.V1(logger, m.ecsClient.ModifyDiskSpec)(req) + if err != nil { + if errors.Is(err, wrap.ErrorCode("NoChangeInDiskCategoryAndPerformanceLevel")) { + logger.V(2).Info("disk spec not changed") + return nil + } + return fmt.Errorf("error while modifying disk %s: %w", diskID, err) + } + + logger.V(2).Info("modifying disk spec", "task", resp.TaskId) + m.runningTaskIDs.Store(diskID, resp.TaskId) + task, err := m.waitForTask(ctx, logger, resp.TaskId) + if err != nil { + return err + } + m.runningTaskIDs.Delete(diskID) + if task.TaskStatus != "Finished" { + return fmt.Errorf("unexpected task status %s", task.TaskStatus) + } + return nil +} + +func (m *ModifyServer) modifyDiskAttribute(ctx context.Context, logger logr.Logger, diskID string, burstingEnabled bool) error { + var err error + for range 3 { + req := ecs.CreateModifyDiskAttributeRequest() + req.DiskId = diskID + req.BurstingEnabled = requests.NewBoolean(burstingEnabled) + + _, err = wrap.V1(logger, m.ecsClient.ModifyDiskAttribute)(req) + if err == nil { + logger.V(2).Info("modified disk bursting enabled", "enabled", burstingEnabled) + return nil + } + if errors.Is(err, wrap.ErrorCode("BurstingEnabledForModifyingDiskUnsupported")) { + // TODO: ECS will optimize this, and we may remove this retry in the future + logger.V(2).Info("disk still modifying, retrying") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + continue + } + } else { + return fmt.Errorf("error while modifying disk %s bursting enabled: %w", diskID, err) + } + } + return err +} + +func (m *ModifyServer) Modify(ctx context.Context, diskID string, params ModifyParameters) error { + logger := klog.FromContext(ctx) + +waitPrevious: + taskObj, ok := m.runningTaskIDs.Load(diskID) + if ok { + taskID := taskObj.(string) + logger.V(2).Info("wait for running task", "task", taskID) + _, err := m.waitForTask(ctx, logger, taskID) + if err != nil { + return err + } + m.runningTaskIDs.Delete(diskID) + } + + if len(params.Category) > 0 || len(params.PerformanceLevel) > 0 || params.ProvisionedIops != nil { + err := m.modifyDiskSpec(ctx, logger, diskID, string(params.Category), string(params.PerformanceLevel), params.ProvisionedIops) + if err != nil { + if errors.Is(err, wrap.ErrorCode(IncorrectDiskStatus)) { + // Already modifying? This can happen if CSI was restarted and runningTaskIDs lost + logger.V(2).Info("maybe disk is being modified, recovering previous task") + task, errTask := m.retrieveTask(logger, diskID) + if errTask != nil { + return fmt.Errorf("IncorrectDiskStatus, then retrieve task failed: %v", errTask) + } + if task == nil || task.TaskStatus != "Processing" { + return fmt.Errorf("IncorrectDiskStatus, and no processing task found") + } + m.runningTaskIDs.Store(diskID, task.TaskId) + goto waitPrevious + } + return err + } + } + + // Should goes after ModifyDiskSpec, because we may need to modify category to cloud_auto first + if params.BurstingEnabled != nil { + err := m.modifyDiskAttribute(ctx, logger, diskID, *params.BurstingEnabled) + if err != nil { + return err + } + } + + // Remove tags before adding new ones, to avoid maximum tag count limit. + if len(params.RemoveTags) > 0 { + req := ecs.CreateUntagResourcesRequest() + req.ResourceType = "disk" + req.ResourceId = ptr.To([]string{diskID}) + req.TagKey = ¶ms.RemoveTags + + _, err := wrap.V1(logger, m.ecsClient.UntagResources)(req) + if err != nil { + return fmt.Errorf("error while untagging disk %s: %w", diskID, err) + } + logger.V(2).Info("untagged disk") + } + + if len(params.Tags) > 0 { + req := ecs.CreateTagResourcesRequest() + req.ResourceType = "disk" + req.ResourceId = ptr.To([]string{diskID}) + req.Tag = ¶ms.Tags + + _, err := wrap.V1(logger, m.ecsClient.TagResources)(req) + if err != nil { + return fmt.Errorf("error while tagging disk %s: %w", diskID, err) + } + logger.V(2).Info("tagged disk") + } + return nil +} diff --git a/pkg/disk/utils.go b/pkg/disk/utils.go index b2447de2f..292294bca 100644 --- a/pkg/disk/utils.go +++ b/pkg/disk/utils.go @@ -65,7 +65,6 @@ var ( ) const ( - DISK_TAG_PREFIX = "diskTags/" instanceTypeInfoAnnotation = "alibabacloud.com/instance-type-info" ) @@ -563,7 +562,7 @@ func getDiskVolumeOptions(req *csi.CreateVolumeRequest) (*diskVolumeArgs, error) // disk Type diskType, err := validateDiskType(volOptions) if err != nil { - return nil, fmt.Errorf("Illegal required parameter type: " + volOptions["type"]) + return nil, fmt.Errorf("Illegal required parameter type: " + volOptions[DISK_TYPE]) } diskVolArgs.Type = diskType pls, err := validateDiskPerformanceLevel(volOptions) @@ -708,11 +707,11 @@ func getDiskVolumeOptions(req *csi.CreateVolumeRequest) (*diskVolumeArgs, error) } func validateDiskType(opts map[string]string) (diskType []Category, err error) { - if value, ok := opts["type"]; !ok || (ok && value == DiskHighAvail) { + if value, ok := opts[DISK_TYPE]; !ok || (ok && value == DiskHighAvail) { diskType = []Category{DiskSSD, DiskEfficiency} return } - for _, cusType := range strings.Split(opts["type"], ",") { + for _, cusType := range strings.Split(opts[DISK_TYPE], ",") { c := Category(cusType) if _, ok := AllCategories[c]; ok { diskType = append(diskType, c) @@ -721,7 +720,7 @@ func validateDiskType(opts map[string]string) (diskType []Category, err error) { } } if len(diskType) == 0 { - return diskType, fmt.Errorf("Illegal required parameter type: " + opts["type"]) + return diskType, fmt.Errorf("Illegal required parameter type: " + opts[DISK_TYPE]) } return } @@ -772,6 +771,65 @@ func validateCapabilities(capabilities []*csi.VolumeCapability) (bool, error) { } return multiAttachRequired, nil } +func parseMutableParameters(mutableParameters map[string]string) (ModifyParameters, error) { + var params ModifyParameters + for k, v := range mutableParameters { + switch k { + case DISK_TYPE: + params.Category = Category(v) + case ESSD_PERFORMANCE_LEVEL: + params.PerformanceLevel = PerformanceLevel(v) + case PROVISIONED_IOPS_KEY: + iops, err := strconv.Atoi(v) + if err != nil { + return params, fmt.Errorf("invalid %s: %w", PROVISIONED_IOPS_KEY, err) + } + params.ProvisionedIops = &iops + case BURSTING_ENABLED_KEY: + en, err := strconv.ParseBool(v) + if err != nil { + return params, fmt.Errorf("invalid %s: %w", BURSTING_ENABLED_KEY, err) + } + params.BurstingEnabled = &en + default: + switch { + case strings.HasPrefix(k, DISK_TAG_PREFIX): + tagKey := k[len(DISK_TAG_PREFIX):] + params.Tags = append(params.Tags, ecs.TagResourcesTag{ + Key: tagKey, + Value: v, + }) + case strings.HasPrefix(k, REMOVE_DISK_TAG_PREFIX): + tagKey := k[len(REMOVE_DISK_TAG_PREFIX):] + params.RemoveTags = append(params.RemoveTags, tagKey) + default: + return params, fmt.Errorf("unknown parameter %s", k) + } + } + } + return params, nil +} + +func importMutableParameters(params *diskVolumeArgs, mutable *ModifyParameters) { + if len(mutable.Category) > 0 { + params.Type = []Category{mutable.Category} + } + if len(mutable.PerformanceLevel) > 0 { + params.PerformanceLevel = []PerformanceLevel{mutable.PerformanceLevel} + } + if mutable.ProvisionedIops != nil { + params.ProvisionedIops = int64(*mutable.ProvisionedIops) + } + if mutable.BurstingEnabled != nil { + params.BurstingEnabled = *mutable.BurstingEnabled + } + for _, tag := range mutable.RemoveTags { + delete(params.DiskTags, tag) + } + for _, tag := range mutable.Tags { + params.DiskTags[tag.Key] = tag.Value + } +} func getMountedVolumeDevice(mnts []k8smount.MountInfo, targetPath string) string { for _, mnt := range mnts {