Skip to content

Commit

Permalink
Merge pull request #924 from riteshghorse/resource-leak
Browse files Browse the repository at this point in the history
Change error reporting to temporary in CreateVolume workflow
  • Loading branch information
k8s-ci-robot authored Sep 6, 2024
2 parents 47e2dbe + 60c6e08 commit 2fbd1f1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 16 deletions.
20 changes: 14 additions & 6 deletions pkg/cloud_provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/common"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/util"

filev1beta1 "google.golang.org/api/file/v1beta1"
Expand Down Expand Up @@ -313,7 +314,7 @@ func (manager *gcfsServiceManager) CreateInstance(ctx context.Context, obj *Serv
err = manager.waitForOp(ctx, op)
if err != nil {
klog.Errorf("WaitFor CreateInstance op %s failed: %v", op.Name, err)
return nil, err
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("unknown error when polling the operation: %w", err))
}
serviceInstance, err := manager.GetInstance(ctx, obj)
if err != nil {
Expand All @@ -328,14 +329,14 @@ func (manager *gcfsServiceManager) GetInstance(ctx context.Context, obj *Service
instance, err := manager.instancesService.Get(instanceUri).Context(ctx).Do()
if err != nil {
klog.Errorf("Failed to get instance %v", instanceUri)
return nil, err
return nil, common.NewTemporaryError(codes.Unavailable, err)
}

if instance != nil {
klog.V(4).Infof("GetInstance call fetched instance %+v", instance)
return cloudInstanceToServiceInstance(instance)
}
return nil, fmt.Errorf("failed to get instance %v", instanceUri)
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("failed to get instance %v", instanceUri))
}

func cloudInstanceToServiceInstance(instance *filev1beta1.Instance) (*ServiceInstance, error) {
Expand Down Expand Up @@ -714,6 +715,13 @@ func existingErrorCode(err error) *codes.Code {
return nil
}

var te *common.TemporaryError
// explicitly check if the error type is a `common.TemporaryError`.
if errors.As(err, &te) {
if status, ok := status.FromError(err); ok {
return util.ErrCodePtr(status.Code())
}
}
// We want to make sure we catch other error types that are statusable.
// (eg. grpc-go/internal/status/status.go Error struct that wraps a status)
var googleErr *googleapi.Error
Expand Down Expand Up @@ -1030,7 +1038,7 @@ func (manager *gcfsServiceManager) StartCreateShareOp(ctx context.Context, share

op, err := manager.multishareInstancesSharesService.Create(instanceuri, targetshare).ShareId(share.Name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("CreateShare operation failed: %w", err)
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("CreateShare operation failed: %w", err))
}
klog.Infof("Started Create Share op %s for share %q instance uri %q, with capacity(GB) %v, Labels %v", op.Name, share.Name, instanceuri, targetshare.CapacityGb, targetshare.Labels)
return op, nil
Expand Down Expand Up @@ -1082,7 +1090,7 @@ func (manager *gcfsServiceManager) GetOp(ctx context.Context, op string) (*filev
func (manager *gcfsServiceManager) GetShare(ctx context.Context, obj *Share) (*Share, error) {
sobj, err := manager.multishareInstancesSharesService.Get(shareURI(obj.Parent.Project, obj.Parent.Location, obj.Parent.Name, obj.Name)).Context(ctx).Do()
if err != nil {
return nil, err
return nil, common.NewTemporaryError(codes.Unavailable, err)
}

_, _, _, shareName, err := util.ParseShareURI(sobj.Name)
Expand All @@ -1091,7 +1099,7 @@ func (manager *gcfsServiceManager) GetShare(ctx context.Context, obj *Share) (*S
}
instance, err := manager.GetMultishareInstance(ctx, obj.Parent)
if err != nil {
return nil, err
return nil, common.NewTemporaryError(codes.Unavailable, err)
}

return &Share{
Expand Down
64 changes: 64 additions & 0 deletions pkg/common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TemporaryError wraps an error with the a temporary error code required by
// other services like `csi-provisioner`.
// It implements the error interface. Do not return `TemporaryError` directly
// from CSI Spec API calls, as CSI Spec API calls MUST return a
// standard gRPC status. If TemporaryErrors are returned from
// helper functions within a CSI Spec API method, make sure the outer CSI
// Spec API method returns a standard gRPC status.
type TemporaryError struct {
err error
code codes.Code
}

// Unwrap extracts the original error.
func (t *TemporaryError) Unwrap() error {
return t.err
}

// GRPCStatus extracts the underlying gRPC Status error.
// This method is necessary to fulfill the grpcstatus interface
// described in https://pkg.go.dev/google.golang.org/grpc/status#FromError.
// `FromError` is used in `CodeForError` to get existing error codes
// from status errors.
func (t *TemporaryError) GRPCStatus() *status.Status {
if t.err == nil {
return status.New(codes.OK, "")
}
return status.New(t.code, t.err.Error())
}

// NewTemporaryError constructs a new `TemporaryError` instance.
//
// This function creates a TemporaryError by wrapping the given error (`err`)
// and assigning it a specific error code (`code`).
func NewTemporaryError(code codes.Code, err error) *TemporaryError {
return &TemporaryError{err: err, code: code}
}

// Error returns a readable representation of the TemporaryError.
func (t *TemporaryError) Error() string {
return t.err.Error()
}
8 changes: 5 additions & 3 deletions pkg/csi_driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu

if err != nil {
klog.Errorf("CreateVolume returned an error %v, for request %+v", err, req)
return nil, err
return nil, file.StatusError(err)
}
klog.Infof("CreateVolume response %v, for request %+v", response, req)
return response, nil
Expand Down Expand Up @@ -241,6 +241,8 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
filer, err := s.config.fileService.GetInstance(ctx, newFiler)
// No error is returned if the instance is not found during CreateVolume.
if err != nil && !file.IsNotFoundErr(err) {
// Failed to GetInstance, however the Filestore instance may already be created.
// The error should be non-final.
return nil, file.StatusError(err)
}

Expand All @@ -259,11 +261,11 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
if filer.State != "READY" {
msg := fmt.Sprintf("Volume %v not ready, current state: %v", name, filer.State)
klog.V(4).Infof(msg)
return nil, status.Error(codes.Internal, msg)
return nil, status.Errorf(codes.Unavailable, msg)
}
} else {
param := req.GetParameters()
// If we are creating a new instance, we need pick an unused CIDR range from reserved-ipv4-cidr
// If we are creating a new instance, we need to pick an unused CIDR range from reserved-ipv4-cidr
// If the param was not provided, we default reservedIPRange to "" and cloud provider takes care of the allocation
if newFiler.Network.ConnectMode == privateServiceAccess {
if reservedIPRange, ok := param[ParamReservedIPRange]; ok {
Expand Down
7 changes: 4 additions & 3 deletions pkg/csi_driver/multishare_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/klog/v2"
cloud "sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider/file"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/common"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/util"
)

Expand Down Expand Up @@ -201,7 +202,7 @@ func (m *MultishareController) CreateVolume(ctx context.Context, req *csi.Create
// lock released. poll for op.
err = m.waitOnWorkflow(ctx, workflow)
if err != nil {
return nil, file.StatusError(fmt.Errorf("Create Volume failed, operation %q poll error: %w", workflow.opName, err))
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("Create Volume failed, operation %q poll error: %w", workflow.opName, err))
}

klog.Infof("Poll for operation %s (type %s) completed", workflow.opName, workflow.opType.String())
Expand Down Expand Up @@ -229,7 +230,7 @@ func (m *MultishareController) CreateVolume(ctx context.Context, req *csi.Create
// lock released. poll for share create op.
err = m.waitOnWorkflow(ctx, shareCreateWorkflow)
if err != nil {
return nil, file.StatusError(fmt.Errorf("%v operation %q poll error: %w", shareCreateWorkflow.opType, shareCreateWorkflow.opName, err))
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("%v operation %q poll error: %w", shareCreateWorkflow.opType, shareCreateWorkflow.opName, err))
}
resp, err := m.getShareAndGenerateCSICreateVolumeResponse(ctx, instanceScPrefix, newShare, maxShareSizeSizeBytes)
return resp, file.StatusError(err)
Expand Down Expand Up @@ -343,7 +344,7 @@ func (m *MultishareController) createNewBackup(ctx context.Context, backupInfo *
func (m *MultishareController) getShareAndGenerateCSICreateVolumeResponse(ctx context.Context, instancePrefix string, s *file.Share, maxShareSizeSizeBytes int64) (*csi.CreateVolumeResponse, error) {
share, err := m.cloud.File.GetShare(ctx, s)
if err != nil {
return nil, err
return nil, common.NewTemporaryError(codes.Unavailable, err)
}

if share.State != "READY" {
Expand Down
11 changes: 7 additions & 4 deletions pkg/csi_driver/multishare_stateful_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package driver

import (
"fmt"

csi "github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand All @@ -29,6 +31,7 @@ import (
listers "sigs.k8s.io/gcp-filestore-csi-driver/pkg/client/listers/multishare/v1"
cloud "sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/cloud_provider/file"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/common"
"sigs.k8s.io/gcp-filestore-csi-driver/pkg/util"
)

Expand Down Expand Up @@ -94,13 +97,13 @@ func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *cs
shareInfo, err := m.shareLister.ShareInfos(util.ManagedFilestoreCSINamespace).Get(pvName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error getting shareInfo %q from informer: %s", pvName, err.Error())
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error getting shareInfo %q from informer: %s", pvName, err.Error()))
}
klog.Infof("querying ShareInfo %q from api server", pvName)
shareInfo, err = m.clientset.MultishareV1().ShareInfos(util.ManagedFilestoreCSINamespace).Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return nil, status.Errorf(codes.Internal, "error getting shareInfo %q from api server: %s", pvName, err.Error())
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error getting shareInfo %q from api server: %s", pvName, err.Error()))
}
klog.V(6).Infof("shareInfo object for share %q not found in API server", pvName)
shareInfo = nil
Expand Down Expand Up @@ -131,7 +134,7 @@ func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *cs
klog.V(6).Infof("trying to create shareInfo object: %v", shareInfo)
shareInfo, err = m.createShareInfo(ctx, shareInfo)
if err != nil {
return nil, status.Errorf(codes.Internal, "error creating share object: %s", err.Error())
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error creating share object: %s", err.Error()))
}
}

Expand All @@ -141,7 +144,7 @@ func (m *MultishareStatefulController) CreateVolume(ctx context.Context, req *cs

if shareInfo.Status.ShareStatus != v1.READY {
if shareInfo.Status.Error != "" {
return nil, status.Errorf(codes.Internal, "internal error: %s", shareInfo.Status.Error)
return nil, common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error fetching share status: %s", shareInfo.Status.Error))
}
return nil, status.Errorf(codes.Aborted, "share %s is not ready yet", pvName)
}
Expand Down

0 comments on commit 2fbd1f1

Please sign in to comment.