Skip to content

Commit 775d10e

Browse files
authoredMar 19, 2025··
fix(sync): delete complete worker pod when progressing & add more concurrent options (#30)
1 parent ab2953b commit 775d10e

File tree

5 files changed

+108
-32
lines changed

5 files changed

+108
-32
lines changed
 

‎.golangci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ linters:
2525
- errcheck
2626
- ginkgolinter
2727
- goconst
28-
- gocyclo
2928
- gofmt
3029
- goimports
3130
- gosimple

‎cmd/app/controller.go

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"sigs.k8s.io/controller-runtime/pkg/log/zap"
2626

2727
"github.com/juicedata/juicefs-operator/internal/controller"
28+
"github.com/juicedata/juicefs-operator/pkg/common"
29+
2830
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
2931
// to ensure that exec-entrypoint and run can make use of them.
3032
_ "k8s.io/client-go/plugin/pkg/client/auth"
@@ -48,6 +50,9 @@ func init() {
4850
fs := flag.NewFlagSet("", flag.ExitOnError)
4951
zapOpts.BindFlags(fs)
5052
ControllerCmd.Flags().AddGoFlagSet(fs)
53+
ControllerCmd.PersistentFlags().IntVarP(&common.MaxSyncConcurrentReconciles, "max-sync-concurrent-reconciles", "", 10, "max concurrent reconciles for sync")
54+
ControllerCmd.PersistentFlags().Float32VarP(&common.K8sClientQPS, "k8s-client-qps", "", 30, "QPS indicates the maximum QPS to the master from this client. Setting this to a negative value will disable client-side ratelimiting")
55+
ControllerCmd.PersistentFlags().IntVarP(&common.K8sClientBurst, "k8s-client-burst", "", 20, "Maximum burst for throttle")
5156
}
5257

5358
func run() {

‎cmd/app/manager.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package app
1919
import (
2020
"crypto/tls"
2121

22+
"github.com/juicedata/juicefs-operator/pkg/common"
2223
ctrl "sigs.k8s.io/controller-runtime"
2324
"sigs.k8s.io/controller-runtime/pkg/healthz"
2425
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
@@ -68,7 +69,11 @@ func NewManager() (ctrl.Manager, error) {
6869
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/metrics/filters#WithAuthenticationAndAuthorization
6970
metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
7071
}
71-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
72+
73+
cfg := ctrl.GetConfigOrDie()
74+
cfg.QPS = common.K8sClientQPS
75+
cfg.Burst = common.K8sClientBurst
76+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
7277
Scheme: scheme,
7378
Metrics: metricsServerOptions,
7479
HealthProbeBindAddress: probeAddr,

‎internal/controller/sync_controller.go

+92-30
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,20 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
119119
}
120120

121121
builder := builder.NewSyncPodBuilder(sync, from, to)
122+
l.Info("start to prepare sync worker pods", "replicas", sync.Spec.Replicas)
122123
if err := r.prepareWorkerPod(ctx, sync, builder); err != nil {
123124
l.Error(err, "failed to prepare worker pod")
124125
return ctrl.Result{}, err
125126
}
127+
l.Info("prepare worker pod done", "replicas", sync.Spec.Replicas)
126128

129+
l.Info("start to prepare sync manager pod")
127130
// prepare manager pod
128131
if err := r.prepareManagerPod(ctx, sync, builder); err != nil {
129132
l.Error(err, "failed to prepare manager pod")
130133
return ctrl.Result{}, err
131134
}
135+
l.Info("prepare manager pod done, ready to sync")
132136

133137
sync.Status.StartAt = &metav1.Time{Time: time.Now()}
134138
sync.Status.Phase = juicefsiov1.SyncPhaseProgressing
@@ -139,13 +143,11 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
139143

140144
if sync.Status.Phase == juicefsiov1.SyncPhaseCompleted {
141145
// delete worker pod
142-
labelSelector := client.MatchingLabels{
143-
common.LabelSync: sync.Name,
144-
common.LabelAppType: common.LabelSyncWorkerValue,
145-
}
146-
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(sync.Namespace), labelSelector); err != nil {
146+
if err := r.deleteWorkerPods(ctx, sync, true); err != nil {
147+
l.Error(err, "failed to delete worker pods")
147148
return ctrl.Result{}, err
148149
}
150+
149151
if sync.Spec.TTLSecondsAfterFinished != nil {
150152
completedAt := sync.Status.CompletedAt
151153
if completedAt == nil {
@@ -163,29 +165,40 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
163165
}
164166

165167
if sync.Status.Phase == juicefsiov1.SyncPhaseFailed {
166-
labelSelector := client.MatchingLabels{
167-
common.LabelSync: sync.Name,
168-
common.LabelAppType: common.LabelSyncWorkerValue,
169-
}
170-
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(sync.Namespace), labelSelector); err != nil {
171-
return ctrl.Result{}, client.IgnoreNotFound(err)
168+
if err := r.deleteWorkerPods(ctx, sync, true); err != nil {
169+
l.Error(err, "failed to delete worker pods")
170+
return ctrl.Result{}, err
172171
}
173-
return ctrl.Result{}, nil
174172
}
175173

176174
if sync.Status.Phase == juicefsiov1.SyncPhaseProgressing {
177175
// get manager pod
178176
managerPod := &corev1.Pod{}
179177
if err := r.Get(ctx, client.ObjectKey{Namespace: sync.Namespace, Name: common.GenSyncManagerName(sync.Name)}, managerPod); err != nil {
180-
return ctrl.Result{}, client.IgnoreNotFound(err)
178+
if apierrors.IsNotFound(err) {
179+
sync.Status.Phase = juicefsiov1.SyncPhaseFailed
180+
sync.Status.Reason = "manager pod not found"
181+
return ctrl.Result{}, r.Status().Update(ctx, sync)
182+
}
183+
l.Error(err, "failed to get manager pod")
184+
return ctrl.Result{}, err
181185
}
182-
status, err := r.calculateSyncStats(ctx, sync, managerPod)
183-
if err != nil {
186+
187+
// delete worker completed pod
188+
if err := r.deleteWorkerPods(ctx, sync, false); err != nil {
189+
l.Error(err, "failed to delete worker pods")
184190
return ctrl.Result{}, err
185191
}
192+
193+
status, err := r.calculateSyncStats(ctx, sync, managerPod)
186194
if !reflect.DeepEqual(sync.Status, status) {
187195
sync.Status = status
188-
return ctrl.Result{RequeueAfter: 3 * time.Second}, utils.IgnoreConflict(r.Status().Update(ctx, sync))
196+
if err := r.Status().Update(ctx, sync); err != nil {
197+
return ctrl.Result{}, err
198+
}
199+
}
200+
if err != nil {
201+
return ctrl.Result{}, err
189202
}
190203
return ctrl.Result{RequeueAfter: 3 * time.Second}, nil
191204
}
@@ -197,22 +210,24 @@ func (r *SyncReconciler) calculateSyncStats(ctx context.Context, sync *juicefsio
197210
l := log.FromContext(ctx)
198211
status := sync.Status
199212
if managerPod.Status.Phase == corev1.PodSucceeded || managerPod.Status.Phase == corev1.PodFailed {
213+
if managerPod.Status.Phase == corev1.PodFailed {
214+
status.Phase = juicefsiov1.SyncPhaseFailed
215+
} else {
216+
status.Phase = juicefsiov1.SyncPhaseCompleted
217+
}
218+
status.CompletedAt = &metav1.Time{Time: time.Now()}
200219
finishLog, err := utils.LogPod(ctx, sync.Namespace, common.GenSyncManagerName(sync.Name), common.SyncNamePrefix, 5)
201220
if err != nil {
221+
status.Reason = "failed to get manager pod last logs\nerror: " + err.Error()
202222
l.Error(err, "failed to get manager pod last logs")
203223
return status, err
204224
}
205225
if len(finishLog) > 0 {
206226
status.FinishLog = finishLog
207227
}
208-
if managerPod.Status.Phase == corev1.PodFailed {
209-
status.Phase = juicefsiov1.SyncPhaseFailed
210-
} else {
211-
status.Phase = juicefsiov1.SyncPhaseCompleted
212-
}
213-
status.CompletedAt = &metav1.Time{Time: time.Now()}
214228
statsMap, err := utils.ParseLog(status.FinishLog)
215229
if err != nil {
230+
status.Reason = "failed to parse log\nerror: " + err.Error()
216231
l.Error(err, "failed to parse log")
217232
} else {
218233
stats := juicefsiov1.SyncStats{}
@@ -347,7 +362,7 @@ func (r *SyncReconciler) prepareWorkerPod(ctx context.Context, sync *juicefsiov1
347362
}
348363
}
349364
if len(ips) == int(*sync.Spec.Replicas)-1 {
350-
log.V(1).Info("sync worker pod ready", "ips", ips)
365+
log.Info("sync worker pod ready", "ips", ips)
351366
builder.UpdateWorkerIPs(ips)
352367
return nil
353368
}
@@ -356,15 +371,63 @@ func (r *SyncReconciler) prepareWorkerPod(ctx context.Context, sync *juicefsiov1
356371
}
357372
}
358373

374+
func (r *SyncReconciler) deleteWorkerPods(ctx context.Context, sync *juicefsiov1.Sync, all bool) error {
375+
labelSelector := client.MatchingLabels{
376+
common.LabelSync: sync.Name,
377+
common.LabelAppType: common.LabelSyncWorkerValue,
378+
}
379+
var fieldSelector client.MatchingFields
380+
if !all {
381+
fieldSelector = client.MatchingFields{
382+
"status.phase": string(corev1.PodSucceeded),
383+
}
384+
}
385+
return client.IgnoreNotFound(
386+
r.DeleteAllOf(ctx, &corev1.Pod{},
387+
client.InNamespace(sync.Namespace),
388+
labelSelector,
389+
fieldSelector,
390+
))
391+
}
392+
359393
func (r *SyncReconciler) prepareManagerPod(ctx context.Context, sync *juicefsiov1.Sync, builder *builder.SyncPodBuilder) error {
360394
managerPod := builder.NewManagerPod()
361-
if err := r.Get(ctx, client.ObjectKey{Namespace: sync.Namespace, Name: managerPod.Name}, &corev1.Pod{}); err != nil {
362-
if apierrors.IsNotFound(err) {
363-
return r.Create(ctx, managerPod)
364-
}
395+
err := r.Get(ctx, client.ObjectKey{Namespace: sync.Namespace, Name: managerPod.Name}, &corev1.Pod{})
396+
if err != nil && !apierrors.IsNotFound(err) {
365397
return err
366398
}
367-
return nil
399+
if apierrors.IsNotFound(err) {
400+
if err := r.Create(ctx, managerPod); err != nil {
401+
return err
402+
}
403+
}
404+
// waiting for manager pod ready
405+
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
406+
defer cancel()
407+
for {
408+
select {
409+
case <-ctx.Done():
410+
return fmt.Errorf("timeout waiting for manager pod ready")
411+
default:
412+
err := r.Get(ctx, client.ObjectKey{Namespace: sync.Namespace, Name: managerPod.Name}, managerPod)
413+
if err != nil {
414+
if apierrors.IsNotFound(err) {
415+
time.Sleep(1 * time.Second)
416+
continue
417+
}
418+
return err
419+
}
420+
if utils.IsPodReady(*managerPod) {
421+
log.FromContext(ctx).Info("sync manager pod ready")
422+
return nil
423+
}
424+
// It may have failed/successed immediately after starting, also returns success at this time.
425+
if managerPod.Status.Phase == corev1.PodFailed || managerPod.Status.Phase == corev1.PodSucceeded {
426+
return nil
427+
}
428+
time.Sleep(5 * time.Second)
429+
}
430+
}
368431
}
369432

370433
// SetupWithManager sets up the controller with the Manager.
@@ -373,8 +436,7 @@ func (r *SyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
373436
For(&juicefsiov1.Sync{}).
374437
Owns(&corev1.Pod{}).
375438
WithOptions(controller.Options{
376-
// TODO: configable
377-
MaxConcurrentReconciles: 5,
439+
MaxConcurrentReconciles: common.MaxSyncConcurrentReconciles,
378440
}).
379441
Named("sync").
380442
Complete(r)

‎pkg/common/common.go

+5
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ var (
9090

9191
DefaultBackupWorkerDuration = 10 * time.Minute
9292
DefaultWaitingMaxDuration = 1 * time.Hour
93+
94+
MaxSyncConcurrentReconciles = 10
95+
96+
K8sClientQPS float32 = 30
97+
K8sClientBurst int = 20
9398
)
9499

95100
func GenWorkerName(cgName string, nodeName string) string {

0 commit comments

Comments
 (0)