Skip to content

Commit

Permalink
fixed volume backup issues
Browse files Browse the repository at this point in the history
  • Loading branch information
AmitRoushan committed Jul 11, 2023
1 parent 080d5da commit 9895df7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 39 deletions.
42 changes: 21 additions & 21 deletions controllers/backup/backupcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (ctrl *controller) processBackup(key string) error {
}

// backup volume
volumeResources, err := resourceCache.GetByGVK(k8sresource.PersistentVolumeGVK)
volumeResources, err := resourceCache.GetByGVK(k8sresource.PersistentVolumeClaimGVK)
if err != nil {
ctrl.logger.Errorf("Unable to get persistent volume resources from cache. %s", err)
return err
Expand Down Expand Up @@ -611,26 +611,26 @@ func (ctrl *controller) handlePodVolumeSnapshot(backup *kahuapi.Backup,
return backup, nil, err
}

pvs := make([]k8sresource.Resource, 0)
for _, pvc := range pvcs {
pv, err := ctrl.kubeClient.CoreV1().
PersistentVolumes().
Get(ctrl.ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return backup, nil, err
}

// populate pv apiversion info
pv.APIVersion = k8sresource.PersistentVolumeGVK.GroupVersion().String()
pv.Kind = k8sresource.PersistentVolumeGVK.Kind
resource, err := k8sresource.ToResource(pv)
if err != nil {
return backup, nil, err
}
pvs = append(pvs, resource)
}

volumeGroups, err := ctrl.volumeHandler.Group().ByPV(pvs, group.WithProvisioner())
// pvs := make([]k8sresource.Resource, 0)
// for _, pvc := range pvcs {
// pv, err := ctrl.kubeClient.CoreV1().
// PersistentVolumes().
// Get(ctrl.ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
// if err != nil {
// return backup, nil, err
// }

// // populate pv apiversion info
// pv.APIVersion = k8sresource.PersistentVolumeGVK.GroupVersion().String()
// pv.Kind = k8sresource.PersistentVolumeGVK.Kind
// resource, err := k8sresource.ToResource(pv)
// if err != nil {
// return backup, nil, err
// }
// pvs = append(pvs, resource)
// }

volumeGroups, err := ctrl.volumeHandler.Group().ByPVC(pvcs, group.WithProvisioner())
if err != nil {
ctrl.logger.Errorf("Failed to ensure volume group. %s", err)
if _, err = ctrl.updateBackupStatusWithEvent(backup, kahuapi.BackupStatus{
Expand Down
26 changes: 15 additions & 11 deletions controllers/backup/backupvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
)

func (ctrl *controller) processVolumeBackup(backup *kahuapi.Backup,
pvResources []k8sresource.Resource,
pvcResources []k8sresource.Resource,
bl resourcebackup.Interface) (*kahuapi.Backup, error) {
ctrl.logger.Infof("Processing Volume backup(%s)", backup.Name)
if len(pvResources) == 0 {
if len(pvcResources) == 0 {
// set annotation for
return backup, nil
}
Expand All @@ -52,16 +52,21 @@ func (ctrl *controller) processVolumeBackup(backup *kahuapi.Backup,
// filter snapshot volumes and backup rest of volumes
if len(snapshotVolumes) > 0 {
filteredPV := make([]k8sresource.Resource, 0)
for _, pvResource := range pvResources {
if snapshotVolumes.Has(pvResource.GetName()) {
for _, pvcResource := range pvcResources {
if snapshotVolumes.Has(pvcResource.GetName()) {
continue
}
filteredPV = append(filteredPV, pvResource)
filteredPV = append(filteredPV, pvcResource)
}
pvResources = filteredPV
pvcResources = filteredPV
}

return ctrl.backupVolumes(backup, pvResources, bl)
pvcs, err := ctrl.getVolumes(backup, pvcResources)
if err != nil {
return backup, err
}

return ctrl.backupVolumes(backup, pvcs, bl)
}

func (ctrl *controller) backupSnapshotVolumes(backup *kahuapi.Backup,
Expand All @@ -82,15 +87,15 @@ func (ctrl *controller) backupSnapshotVolumes(backup *kahuapi.Backup,
}

func (ctrl *controller) backupVolumes(backup *kahuapi.Backup,
volumes []k8sresource.Resource,
volumes []*corev1.PersistentVolumeClaim,
bl resourcebackup.Interface) (*kahuapi.Backup, error) {
ctrl.logger.Infof("Volume backup (%s) started", backup.Name)
if len(volumes) == 0 {
return backup, nil
}

// create groups based on snapshot and non snapshot volumes
volumeGroups, err := ctrl.volumeHandler.Group().ByPV(volumes, group.WithProvisioner())
volumeGroups, err := ctrl.volumeHandler.Group().ByPVC(volumes, group.WithProvisioner())
if err != nil {
return backup, err
}
Expand Down Expand Up @@ -244,7 +249,7 @@ func (ctrl *controller) getVolumes(
backup *kahuapi.Backup,
resources []k8sresource.Resource) ([]*corev1.PersistentVolumeClaim, error) {
// retrieve all persistent volumes for backup
ctrl.logger.Infof("Getting PersistentVolume for backup(%s)", backup.Name)
ctrl.logger.Infof("Getting PersistentVolumeClaim for backup(%s)", backup.Name)
pvcs := make([]*corev1.PersistentVolumeClaim, 0)
for _, resource := range resources {
pvc := new(corev1.PersistentVolumeClaim)
Expand All @@ -256,7 +261,6 @@ func (ctrl *controller) getVolumes(
}

pvcs = append(pvcs, pvc)

}

return pvcs, nil
Expand Down
35 changes: 28 additions & 7 deletions volume/group/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
)

type Factory interface {
ByPV([]k8sresource.Resource, ...groupFunc) ([]Interface, error)
ByPVC([]*corev1.PersistentVolumeClaim, ...groupFunc) ([]Interface, error)
BySnapshot(snapshots []*kahuapi.VolumeSnapshot, groupings ...groupFunc) ([]Interface, error)
}

Expand All @@ -64,18 +64,18 @@ func NewFactory(clientFactory client.Factory) (Factory, error) {
}, nil
}

type groupFunc func(pvs []k8sresource.Resource) []Interface
type groupFunc func(kubernetes.Interface, []k8sresource.Resource) []Interface

func WithProvisioner() groupFunc {
return withProvisioner
}

func withProvisioner(resources []k8sresource.Resource) []Interface {
func withProvisioner(kubeCli kubernetes.Interface, resources []k8sresource.Resource) []Interface {
groupByProvisioner := make(map[string][]k8sresource.Resource)

// group volumes by provisioner
for _, resource := range resources {
provisioner, err := getProvisioner(resource)
provisioner, err := getProvisioner(kubeCli, resource)
if err != nil {
log.Warningf("unable to group. %s", err)
}
Expand All @@ -97,7 +97,7 @@ func withProvisioner(resources []k8sresource.Resource) []Interface {
return groups
}

func getProvisioner(resource k8sresource.Resource) (string, error) {
func getProvisioner(kubeCli kubernetes.Interface, resource k8sresource.Resource) (string, error) {
switch resource.GetKind() {
case k8sresource.PersistentVolumeGVK.Kind:
pv := new(corev1.PersistentVolume)
Expand All @@ -115,13 +115,34 @@ func getProvisioner(resource k8sresource.Resource) (string, error) {
resource.GetName())
}
return *snapshot.Spec.SnapshotProvider, nil
case k8sresource.PersistentVolumeClaimGVK.Kind:
pvc := new(corev1.PersistentVolumeClaim)
err := k8sresource.FromResource(resource, pvc)
if err != nil {
return "", fmt.Errorf("unable to translate resource[%s] to PersistentVolumeClaim",
resource.GetName())
}
pv, err := kubeCli.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return "", err
}

return utils.VolumeProvisioner(pv), nil
default:
log.Warningf("Invalid kind[%s] for volume grouping", resource.GetKind())
return "", fmt.Errorf("invalid kind[%s] for volume grouping", resource.GetKind())
}
}

func (f *factory) ByPV(resources []k8sresource.Resource, groupings ...groupFunc) ([]Interface, error) {
func (f *factory) ByPVC(pvcs []*corev1.PersistentVolumeClaim, groupings ...groupFunc) ([]Interface, error) {
resources := make([]k8sresource.Resource, 0)
for _, pvc := range pvcs {
resource, err := k8sresource.ToResource(pvc)
if err != nil {
return nil, err
}
resources = append(resources, resource)
}
return f.group(resources, groupings...)
}

Expand All @@ -144,7 +165,7 @@ func (f *factory) group(resources []k8sresource.Resource, groupings ...groupFunc
for _, grouping := range groupings {
newGroup := make([]Interface, 0)
for _, group := range groups {
newGroup = append(newGroup, grouping(group.GetResources())...)
newGroup = append(newGroup, grouping(f.kubeClient, group.GetResources())...)
}
groups = newGroup
}
Expand Down

0 comments on commit 9895df7

Please sign in to comment.