Skip to content

Commit

Permalink
fixed volume backup issues
Browse files Browse the repository at this point in the history
  • Loading branch information
AmitRoushan committed Jul 13, 2023
1 parent fb649ca commit c79f85b
Show file tree
Hide file tree
Showing 17 changed files with 942 additions and 684 deletions.
2 changes: 1 addition & 1 deletion build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ifeq ($(PRINT_HELP),y)
generated_files:
@echo "$$GENERATED_FILES_HELP_INFO"
else
generated_files: verify
generated_files:
$(MAKE) -f Makefile.generate_proto $@
endif

Expand Down
8 changes: 7 additions & 1 deletion build/Makefile.generate_proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ build_provider_service_grpc: $(PROTOC) $(PROTOC_GEN_GO)

grpc_interfaces: build_meta_service_grpc build_volume_service_grpc build_provider_service_grpc

generated_files: grpc_interfaces
import_dependencies:
go mod vendor

remove_dependencies:
rm -rf $(HERE)/vendor

generated_files: import_dependencies grpc_interfaces remove_dependencies

.PHONY: clean
clean:
Expand Down
9 changes: 5 additions & 4 deletions framework/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package executor
import (
"context"
"fmt"
"time"

kahuscheme "github.com/soda-cdm/kahu/client/clientset/versioned/scheme"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (exec *executor) Install(ctx context.Context, location string) error {

switch provider.Spec.Type {
case kahuapi.ProviderTypeMetadata:
return exec.installResourceBackupper(ctx, location, bl, provider, registration)
return exec.installResourceBackupper(ctx, bl, provider, registration)
case kahuapi.ProviderTypeVolume:
return exec.installVolumeBackupper(ctx, location, bl, provider, registration)
default:
Expand Down Expand Up @@ -208,20 +209,20 @@ func (exec *executor) exist(location *kahuapi.BackupLocation, provider *kahuapi.
}

func (exec *executor) installResourceBackupper(ctx context.Context,
location string,
bl *kahuapi.BackupLocation,
provider *kahuapi.Provider,
registration *kahuapi.ProviderRegistration) error {
service := resourcebackup.NewResourceBackupService(ctx,
exec.cfg.ResourceBackup,
exec.cfg.Namespace,
location,
bl,
provider,
registration,
exec.kubeClient,
exec.kahuClient)

location := bl.Name

exec.logger.Infof("Starting resource store [%s]", location)
backupStore, err := service.Start(exec.ctx)
if err != nil {
Expand Down
105 changes: 41 additions & 64 deletions framework/executor/provisioner/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
CoreV1Client "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/soda-cdm/kahu/utils/k8sresource"
)
Expand All @@ -43,61 +45,52 @@ const (

type deploymentProvider struct {
sync.Mutex
logger *log.Entry
kubeClient kubernetes.Interface
namespace string
ctx context.Context
logger *log.Entry
// kubeClient kubernetes.Interface
appsClient appsv1.AppsV1Interface
coreClient CoreV1Client.CoreV1Interface
replicaCount int32
}

func NewDeploymentProvider(_ context.Context, namespace string, kubeClient kubernetes.Interface) Interface {
func newDeploymentProvider(ctx context.Context, kubeClient kubernetes.Interface) Interface {
provider := &deploymentProvider{
ctx: ctx,
logger: log.WithField("module", moduleName),
kubeClient: kubeClient,
namespace: namespace,
appsClient: kubeClient.AppsV1(),
coreClient: kubeClient.CoreV1(),
replicaCount: defaultReplicaCount,
}

return provider
}

func (provider *deploymentProvider) Start(key string,
templateFunc podTemplateFunc) error {
func (provider *deploymentProvider) Start(workloadIndex string, namespace string, podTemplate *corev1.PodTemplateSpec) error {
provider.Lock()
defer provider.Unlock()
deployName := provider.deployName(key)
provider.logger.Infof("Trying to start deployment workload[%s]", deployName)
provider.logger.Infof("Trying to start deployment workload[%s]", workloadIndex)
// check if deploy already exist
workload, err := provider.kubeClient.AppsV1().
Deployments(provider.namespace).
Get(context.TODO(), deployName, metav1.GetOptions{})
workload, err := provider.appsClient.Deployments(namespace).Get(context.TODO(), workloadIndex, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}

// if not available, create one
if apierrors.IsNotFound(err) {
provider.logger.Infof("Creating deployment workload[%s]", deployName)
podTemplate, err := templateFunc()
provider.logger.Infof("Creating deployment workload[%s]", workloadIndex)
deployment, err := provider.deployment(workloadIndex, namespace, podTemplate)
if err != nil {
return err
}

deployment, err := provider.deployment(key, podTemplate)
if err != nil {
return err
}

workload, err = provider.kubeClient.AppsV1().
Deployments(provider.namespace).
Create(context.TODO(), deployment, metav1.CreateOptions{})
workload, err = provider.appsClient.Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}

if *workload.Spec.Replicas == 0 {
err = provider.scaleDeployment(key, provider.replicaCount)
err = provider.scaleDeployment(workloadIndex, namespace, provider.replicaCount)
if err != nil {
return err
}
Expand All @@ -106,21 +99,19 @@ func (provider *deploymentProvider) Start(key string,
return nil
}

func (provider *deploymentProvider) Stop(key string) error {
func (provider *deploymentProvider) Stop(workloadIndex string, namespace string) error {
provider.Lock()
defer provider.Unlock()
provider.logger.Infof("Stopping deployment workload for %s", key)
provider.logger.Infof("Stopping deployment workload for %s", workloadIndex)

// scale down to zero
return provider.scaleDeployment(key, 0)
return provider.scaleDeployment(workloadIndex, namespace, 0)
}

func (provider *deploymentProvider) scaleDeployment(key string, replicas int32) error {
deploymentName := provider.deployName(key)
func (provider *deploymentProvider) scaleDeployment(workloadIndex string, namespace string, replicas int32) error {
deploymentName := workloadIndex
provider.logger.Infof("Scaling deployment[%s] to replica %d", deploymentName, replicas)
deploy, err := provider.kubeClient.AppsV1().
Deployments(provider.namespace).
Get(context.TODO(), deploymentName, metav1.GetOptions{})
deploy, err := provider.appsClient.Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -156,45 +147,41 @@ func (provider *deploymentProvider) patchDeploymentObject(cur, mod *appv1.Deploy
return cur, nil
}

return provider.kubeClient.AppsV1().Deployments(cur.Namespace).Patch(context.TODO(),
return provider.appsClient.Deployments(cur.Namespace).Patch(context.TODO(),
cur.Name,
types.StrategicMergePatchType,
patch,
metav1.PatchOptions{})
}

func (provider *deploymentProvider) Remove(key string) error {
func (provider *deploymentProvider) Remove(workloadIndex string, namespace string) error {
provider.Lock()
defer provider.Unlock()

// remove service
err := provider.kubeClient.CoreV1().
Services(provider.namespace).
Delete(context.TODO(), provider.serviceName(key), metav1.DeleteOptions{})
err := provider.coreClient.Services(namespace).Delete(context.TODO(), workloadIndex, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
return err
}

// remove deployment
err = provider.kubeClient.AppsV1().
Deployments(provider.namespace).
Delete(context.TODO(), provider.deployName(key), metav1.DeleteOptions{})
err = provider.appsClient.Deployments(namespace).Delete(context.TODO(), workloadIndex, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
return err
}

return nil
}

func (provider *deploymentProvider) deployment(key string,
func (provider *deploymentProvider) deployment(workloadIndex string, namespace string,
template *corev1.PodTemplateSpec) (*appv1.Deployment, error) {
// add pod labels same as pod selector
podSelector := provider.podSelector(key)
podSelector := provider.podSelector(workloadIndex)
template.ObjectMeta.Labels = podSelector
deployment := &appv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: provider.deployName(key),
Namespace: provider.namespace,
Name: workloadIndex,
Namespace: namespace,
},
Spec: appv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Expand All @@ -215,23 +202,15 @@ func (provider *deploymentProvider) podSelector(key string) labels.Set {
}
}

func (provider *deploymentProvider) serviceName(key string) string {
return "meta-service-" + key
}

func (provider *deploymentProvider) deployName(key string) string {
return "meta-service-" + key
}

func (provider *deploymentProvider) AddService(key string,
func (provider *deploymentProvider) AddService(workloadIndex string, namespace string,
servicePorts []corev1.ServicePort) (k8sresource.ResourceReference, error) {
provider.Lock()
defer provider.Unlock()

serviceName := provider.serviceName(key)
serviceName := workloadIndex
// check if service already exist
service, err := provider.kubeClient.CoreV1().
Services(provider.namespace).
service, err := provider.coreClient.
Services(namespace).
Get(context.TODO(), serviceName, metav1.GetOptions{})
if err == nil {
return k8sresource.ResourceReference{
Expand All @@ -242,15 +221,14 @@ func (provider *deploymentProvider) AddService(key string,
return k8sresource.ResourceReference{}, err
}

service, err = provider.kubeClient.CoreV1().
Services(provider.namespace).
service, err = provider.coreClient.Services(namespace).
Create(context.TODO(), &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: provider.namespace,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
Selector: provider.podSelector(key),
Selector: provider.podSelector(workloadIndex),
Ports: servicePorts,
},
}, metav1.CreateOptions{})
Expand All @@ -264,10 +242,9 @@ func (provider *deploymentProvider) AddService(key string,
}, err
}

func (provider *deploymentProvider) removeService(key string) error {
func (provider *deploymentProvider) removeService(key string, namespace string) error {
// remove service
err := provider.kubeClient.CoreV1().
Services(provider.namespace).
err := provider.coreClient.Services(namespace).
Delete(context.TODO(), key, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
return err
Expand Down
10 changes: 4 additions & 6 deletions framework/executor/provisioner/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ import (
corev1 "k8s.io/api/core/v1"
)

type podTemplateFunc func() (*corev1.PodTemplateSpec, error)

type Interface interface {
Start(index string, templateFunc podTemplateFunc) error
Stop(index string) error
AddService(index string, servicePorts []corev1.ServicePort) (k8sresource.ResourceReference, error)
Remove(index string) error
Start(workloadIndex string, namespace string, podTemplate *corev1.PodTemplateSpec) error
Stop(workloadIndex string, namespace string) error
AddService(workloadIndex string, namespace string, servicePorts []corev1.ServicePort) (k8sresource.ResourceReference, error)
Remove(workloadIndex string, namespace string) error
}

type Factory interface {
Expand Down
4 changes: 2 additions & 2 deletions framework/executor/provisioner/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type factory struct {
deployment Interface
}

func NewProvisionerFactory(ctx context.Context, namespace string, kubeClient kubernetes.Interface) Factory {
func NewProvisionerFactory(ctx context.Context, kubeClient kubernetes.Interface) Factory {
return &factory{
deployment: NewDeploymentProvider(ctx, namespace, kubeClient),
deployment: newDeploymentProvider(ctx, kubeClient),
}
}

Expand Down
Loading

0 comments on commit c79f85b

Please sign in to comment.