Skip to content

Commit

Permalink
feat: clean cache (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
zxh326 authored Dec 11, 2024
1 parent b0a1f5c commit e62edc7
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 10 deletions.
46 changes: 45 additions & 1 deletion internal/controller/cachegroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/juicedata/juicefs-cache-group-operator/pkg/utils"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -345,6 +346,10 @@ func (r *CacheGroupReconciler) removeRedundantWorkers(
log.Error(err, "failed to delete worker", "worker", worker.Name)
return err
}
if err := r.cleanWorkerCache(ctx, cg, worker); err != nil {
log.Error(err, "failed to clean worker cache", "worker", worker.Name)
return err
}
}
return nil
}
Expand Down Expand Up @@ -480,8 +485,47 @@ func (r *CacheGroupReconciler) waitForWorkerReady(ctx context.Context, cg *juice
}
}

func (r *CacheGroupReconciler) cleanWorkerCache(ctx context.Context, cg *juicefsiov1.CacheGroup, worker corev1.Pod) error {
log := log.FromContext(ctx).WithValues("worker", worker.Name)
if !cg.Spec.CleanCache {
return nil
}
job := builder.NewCleanCacheJob(*cg, worker)
if job == nil {
return nil
}
log.Info("worker is to be deleted, create job to clean cache", "job", job.Name)
err := r.Get(ctx, client.ObjectKey{Namespace: job.Namespace, Name: job.Name}, &batchv1.Job{})
if err == nil {
log.Info("clean cache job already exists", "job", job.Name)
return nil
}
if !apierrors.IsNotFound(err) {
log.Error(err, "failed to get clean cache job", "job", job.Name)
return err
}
if err := r.Create(ctx, job); err != nil {
log.Error(err, "failed to create clean cache job", "job", job.Name)
return err
}
return nil
}

func (r *CacheGroupReconciler) HandleFinalizer(ctx context.Context, cg *juicefsiov1.CacheGroup) error {
// TODO: clean cache
log := log.FromContext(ctx)
if !cg.Spec.CleanCache {
return nil
}
workers, err := r.listActualWorkers(ctx, cg)
if err != nil {
log.Error(err, "failed to list actual worker nodes")
return err
}
for _, worker := range workers {
if err := r.cleanWorkerCache(ctx, cg, worker); err != nil {
log.Error(err, "failed to clean worker cache", "worker", worker.Name)
}
}
return nil
}

Expand Down
60 changes: 60 additions & 0 deletions pkg/builder/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,63 @@ func GetWarmUpOwnerReference(wu *juicefsiov1.WarmUp) []metav1.OwnerReference {
Controller: utils.ToPtr(true),
}}
}

func NewCleanCacheJob(cg juicefsiov1.CacheGroup, worker corev1.Pod) *batchv1.Job {
cacheVolumes := []corev1.Volume{}
for _, volume := range worker.Spec.Volumes {
if strings.HasPrefix(volume.Name, common.CacheDirVolumeNamePrefix) {
cacheVolumes = append(cacheVolumes, volume)
}
}

if len(cacheVolumes) == 0 {
return nil
}

cacheVolumeMounts := []corev1.VolumeMount{}
for _, volume := range cacheVolumes {
cacheVolumeMounts = append(cacheVolumeMounts, corev1.VolumeMount{
Name: volume.Name,
MountPath: fmt.Sprintf("/var/jfsCache/%s", volume.Name),
})
}
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: common.GenCleanCacheJobName(worker.Spec.NodeName),
Namespace: worker.Namespace,
Labels: map[string]string{
common.LabelAppType: common.LableCleanCacheJobValue,
common.LabelCacheGroup: cg.Name,
},
},
Spec: batchv1.JobSpec{
Parallelism: utils.ToPtr(int32(1)),
BackoffLimit: utils.ToPtr(int32(3)),
TTLSecondsAfterFinished: utils.ToPtr(int32(60)),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
common.LabelAppType: common.LableCleanCacheJobValue,
common.LabelCacheGroup: cg.Name,
},
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Tolerations: worker.Spec.Tolerations,
NodeSelector: worker.Spec.NodeSelector,
Containers: []corev1.Container{{
Name: common.CleanCacheContainerName,
Image: worker.Spec.Containers[0].Image,
Command: []string{"/bin/sh", "-c", "rm -rf /var/jfsCache/*/" + cg.Status.FileSystem},
VolumeMounts: cacheVolumeMounts,
Resources: common.DefaultForCleanCacheResources,
}},
ServiceAccountName: worker.Spec.ServiceAccountName,
Volumes: cacheVolumes,
NodeName: worker.Spec.NodeName,
},
},
},
}
return job
}
35 changes: 26 additions & 9 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

const (
// CacheGroupContainerName is the name of cache group worker container
WorkerContainerName = "juicefs-cg-worker"
WarmUpContainerName = "juicefs-warmup"
WorkerContainerName = "juicefs-cg-worker"
WarmUpContainerName = "juicefs-warmup"
CleanCacheContainerName = "juicefs-clean-cache"
// WorkerNamePrefix is the prefix of worker name
WorkerNamePrefix = "juicefs-cg-worker"
WarmUpNamePrefix = "juicefs-warmup"
Expand All @@ -40,24 +41,36 @@ const (
DefaultCacheHostPath = "/var/jfsCache"

// label keys
LabelCacheGroup = "juicefs.io/cache-group"
LabelWorkerHash = "juicefs.io/worker-hash"
LabelWorker = "app.kubernetes.io/name"
LabelWorkerValue = "juicefs-cache-group-worker"
LabelAppType = "app.kubernetes.io/name"
LabelJobValue = "juicefs-warmup-job"
LabelCacheGroup = "juicefs.io/cache-group"
LabelWorkerHash = "juicefs.io/worker-hash"
LabelWorker = "app.kubernetes.io/name"
LabelWorkerValue = "juicefs-cache-group-worker"
LabelAppType = "app.kubernetes.io/name"
LabelJobValue = "juicefs-warmup-job"
LableCleanCacheJobValue = "juicefs-clean-cache-job"

AnnoBackupWorker = "juicefs.io/backup-worker"
AnnoWaitingDeleteWorker = "juicefs.io/waiting-delete-worker"
)

var (
DefaultResources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("5Gi"),
},
}

DefaultForCleanCacheResources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
Requests: corev1.ResourceList{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Expand Down Expand Up @@ -86,3 +99,7 @@ func GenRoleBindingName(wuName string) string {
func GenRoleName(wuName string) string {
return fmt.Sprintf("%s-%s-role", WarmUpNamePrefix, wuName)
}

func GenCleanCacheJobName(nodeName string) string {
return fmt.Sprintf("%s-%s", CleanCacheContainerName, nodeName)
}

0 comments on commit e62edc7

Please sign in to comment.