Skip to content

Commit

Permalink
[build][feat] add event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
cuongpiger committed Jul 1, 2024
1 parent 36c286f commit 89820cc
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 27 deletions.
4 changes: 4 additions & 0 deletions pkg/cloud/entity/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ type ListSnapshots struct {
func (s *ListSnapshots) Len() int {
return len(s.Items)
}

func (s *ListSnapshots) IsEmpty() bool {
return s.Len() < 1
}
65 changes: 40 additions & 25 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type controllerService struct {
modifyVolumeManager *modifyVolumeManager
driverOptions *DriverOptions
k8sClient lsk8s.IKubernetes
recorder lk8srecord.EventRecorder
broadcaster lk8srecord.EventBroadcaster

lvmrpc.UnimplementedModifyServer
Expand Down Expand Up @@ -68,8 +67,7 @@ func newControllerService(pdriOpts *DriverOptions) controllerService {
inFlight: lsinternal.NewInFlight(),
driverOptions: pdriOpts,
modifyVolumeManager: newModifyVolumeManager(),
k8sClient: lsk8s.NewKubernetes(k8sClient),
recorder: recorder,
k8sClient: lsk8s.NewKubernetes(k8sClient, recorder),
broadcaster: broadcaster,
}
}
Expand All @@ -79,18 +77,20 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol
serr lserr.IError
)

llog.V(5).InfoS("[INFO] - CreateVolume: called", "preq", *preq)
llog.V(5).InfoS("[INFO] - CreateVolume: Called", "request", *preq)

// Validate the create volume request
if err := validateCreateVolumeRequest(preq); err != nil {
llog.ErrorS(err, "[ERROR] - CreateVolume: invalid request")
llog.ErrorS(err, "[ERROR] - CreateVolume: Invalid request", "request", *preq)
ns, name := getCreateVolumeRequestNamespacedName(preq)
s.k8sClient.PersistentVolumeClaimEventWarning(pctx, ns, name, "CsiCreateVolumeInvalidRequest", err.Error())
return nil, err
}

// Validate volume size, if volume size is less than the default volume size of cloud provider, set it to the default volume size
volSizeBytes, err := s.getVolSizeBytes(preq)
if err != nil {
llog.ErrorS(err, "[ERROR] - CreateVolume: failed to get volume size")
llog.ErrorS(err, "[ERROR] - CreateVolume: Failed to get volume size")
return nil, ErrFailedToValidateVolumeSize(preq.GetName(), err)
}

Expand All @@ -100,22 +100,27 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol

// check if a request is already in-flight
if ok := s.inFlight.Insert(volName); !ok {
llog.V(5).InfoS("[INFO] - CreateVolume: Volume is already in-flight", "volumeName", volName)
llog.InfoS("[INFO] - CreateVolume: Operation is already in-flight", "volumeName", volName, "inflightKey", volName)
return nil, ErrVolumeIsCreating(volName)
}
defer s.inFlight.Delete(volName)

llog.InfoS("[INFO] - CreateVolume: Insert this action to inflight cache", "volumeName", volName, "inflightKey", volName)
defer func() {
llog.InfoS("[INFO] - CreateVolume: Operation completed", "volumeName", volName, "inflightKey", volName)
s.inFlight.Delete(volName)
}()

if _, serr = s.cloud.GetVolumeByName(volName); serr != nil {
if !serr.IsError(lsdkErrs.EcVServerVolumeNotFound) {
llog.ErrorS(serr.GetError(), "[ERROR] - CreateVolume: failed to get volume", "volumeName", volName)
llog.ErrorS(serr.GetError(), "[ERROR] - CreateVolume: Failed to get volume", "volumeName", volName)
return nil, ErrFailedToListVolumeByName(volName)
}
}

cvr := NewCreateVolumeRequest().WithDriverOptions(s.driverOptions)
parser, _ := ljoat.GetParser()
for pk, pv := range preq.GetParameters() {
llog.InfoS("[INFO] - CreateVolume: parsing request parameters", "key", pk, "value", pv)
llog.InfoS("[INFO] - CreateVolume: Parsing request parameters", "key", pk, "value", pv)
switch lstr.ToLower(pk) {
case VolumeTypeKey:
cvr = cvr.WithVolumeTypeID(pv)
Expand Down Expand Up @@ -159,12 +164,7 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol
}
}

modifyOpts, err := parseModifyVolumeParameters(preq.GetMutableParameters())
if err != nil {
llog.ErrorS(err, "[ERROR] - CreateVolume: invalid request")
return nil, ErrModifyMutableParam
}

modifyOpts, _ := parseModifyVolumeParameters(preq.GetMutableParameters())
volumeSource := preq.GetVolumeContentSource()
if volumeSource != nil {
if _, ok := volumeSource.GetType().(*lcsi.VolumeContentSource_Snapshot); !ok {
Expand All @@ -179,7 +179,7 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol

respCtx, err := cvr.ToResponseContext(volCap)
if err != nil {
llog.ErrorS(err, "[ERROR] - CreateVolume: failed to parse response context", "volumeID", volName)
llog.ErrorS(err, "[ERROR] - CreateVolume: Failed to parse response context", "volumeID", volName)
return nil, err
}

Expand Down Expand Up @@ -220,35 +220,42 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol
return newCreateVolumeResponse(resp, cvr, respCtx), nil
}

func (s *controllerService) DeleteVolume(pctx lctx.Context, preq *lcsi.DeleteVolumeRequest) (*lcsi.DeleteVolumeResponse, error) {
llog.V(4).InfoS("[INFO] - DeleteVolume: called", "args", *preq)
func (s *controllerService) DeleteVolume(_ lctx.Context, preq *lcsi.DeleteVolumeRequest) (*lcsi.DeleteVolumeResponse, error) {
llog.V(4).InfoS("[INFO] - DeleteVolume: called", "request", *preq)

if err := validateDeleteVolumeRequest(preq); err != nil {
llog.Errorf("[ERROR] - DeleteVolume: invalid request")
llog.ErrorS(err, "[ERROR] - DeleteVolume: Invalid request", "request", *preq)
return nil, err
}

volumeID := preq.GetVolumeId()
// check if a request is already in-flight
if ok := s.inFlight.Insert(volumeID); !ok {
llog.InfoS("[INFO] - DeleteVolume: Operation is already in-flight", "volumeID", volumeID)
return nil, ErrOperationAlreadyExists(volumeID)
}
defer s.inFlight.Delete(volumeID)

llog.InfoS("[INFO] - DeleteVolume: Insert this action to inflight cache", "volumeID", volumeID, "inflightKey", volumeID)
defer func() {
llog.InfoS("[INFO] - DeleteVolume: Operation completed", "volumeID", volumeID, "inflightKey", volumeID)
s.inFlight.Delete(volumeID)
}()

// So the volume MUST NOT truly be deleted if it has at least one snapshot
lstSnapshots, ierr := s.cloud.ListSnapshots(volumeID, 1, 10)
if ierr != nil {
llog.ErrorS(ierr.GetError(), "[ERROR] - DeleteVolume: failed to list snapshots", "volumeID", volumeID)
llog.ErrorS(ierr.GetError(), "[ERROR] - DeleteVolume: Failed to list snapshots", "volumeId", volumeID)
return nil, ErrFailedToListSnapshot(volumeID)
}

if lstSnapshots.Len() > 0 {
if !lstSnapshots.IsEmpty() {
llog.ErrorS(nil, "[ERROR] - DeleteVolume: CANNOT delete this volume because of having snapshots", "volumeId", volumeID)
return nil, ErrDeleteVolumeHavingSnapshots(volumeID)
}

if err := s.cloud.DeleteVolume(volumeID); err != nil {
if err != nil {
llog.ErrorS(err.GetError(), "[ERROR] - DeleteVolume: failed to delete volume", "volumeID", volumeID)
llog.ErrorS(err.GetError(), "[ERROR] - DeleteVolume: Failed to delete volume", "volumeID", volumeID)
return nil, ErrFailedToDeleteVolume(volumeID)
}
}
Expand Down Expand Up @@ -309,7 +316,7 @@ func (s *controllerService) ControllerUnpublishVolume(_ lctx.Context, preq *lcsi
return nil, ErrOperationAlreadyExists(volumeID)
}

llog.V(5).InfoS("[INFO] - ControllerUnpublishVolume: Insert this action to inflight cach3", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
llog.V(5).InfoS("[INFO] - ControllerUnpublishVolume: Insert this action to inflight cache", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
defer func() {
llog.InfoS("[INFO] - ControllerUnpublishVolume: Operation completed", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key)
s.inFlight.Delete(volumeID + nodeID)
Expand Down Expand Up @@ -749,3 +756,11 @@ func parsePage(nextToken string) int {

return page
}

func getCreateVolumeRequestNamespacedName(preq *lcsi.CreateVolumeRequest) (string, string) {
params := preq.GetParameters()
if params != nil {
return params[PVCNamespaceKey], params[PVCNameKey]
}
return "", ""
}
4 changes: 4 additions & 0 deletions pkg/k8s/ik8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ import (
type IKubernetes interface {
GetPersistentVolumeClaimByName(pctx lctx.Context, pnamespace, pname string) (*lsentity.PersistentVolumeClaim, lserr.IError)
GetStorageClassByName(pctx lctx.Context, pname string) (*lsentity.StorageClass, lserr.IError)

// Event recorder
PersistentVolumeClaimEventWarning(pctx lctx.Context, pnamespace, pname, preason, pmessage string)
PersistentVolumeClaimEventNormal(pctx lctx.Context, pnamespace, pname, preason, pmessage string)
}
32 changes: 30 additions & 2 deletions pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ package k8s
import (
lctx "context"

lcoreV1 "k8s.io/api/core/v1"
lmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
lk8s "k8s.io/client-go/kubernetes"
lk8srecord "k8s.io/client-go/tools/record"

lsentity "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/cloud/entity"
lserr "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/cloud/errors"
)

type kubernetes struct {
lk8s.Interface
lk8srecord.EventRecorder
}

func NewKubernetes(pk8sclient lk8s.Interface) IKubernetes {
func NewKubernetes(pk8sclient lk8s.Interface, precorder lk8srecord.EventRecorder) IKubernetes {
return &kubernetes{
Interface: pk8sclient,
Interface: pk8sclient,
EventRecorder: precorder,
}
}

Expand Down Expand Up @@ -45,3 +49,27 @@ func (s *kubernetes) GetStorageClassByName(pctx lctx.Context, pname string) (*ls

return lsentity.NewStorageClass(sc), nil
}

func (s *kubernetes) PersistentVolumeClaimEventWarning(pctx lctx.Context, pnamespace, pname, preason, pmessage string) {
if pnamespace == "" || pname == "" {
return
}

pvc, err := s.GetPersistentVolumeClaimByName(pctx, pnamespace, pname)
if err != nil || pvc == nil {
return
}
s.EventRecorder.Event(pvc.PersistentVolumeClaim, lcoreV1.EventTypeWarning, preason, pmessage)
}

func (s *kubernetes) PersistentVolumeClaimEventNormal(pctx lctx.Context, pnamespace, pname, preason, pmessage string) {
if pnamespace == "" || pname == "" {
return
}

pvc, err := s.GetPersistentVolumeClaimByName(pctx, pnamespace, pname)
if err != nil || pvc == nil {
return
}
s.EventRecorder.Event(pvc.PersistentVolumeClaim, lcoreV1.EventTypeNormal, preason, pmessage)
}

0 comments on commit 89820cc

Please sign in to comment.