Skip to content

Commit 73c7232

Browse files
authored
Merge pull request #14 from alauda/cr-queue
Use workqueue for chartrepo
2 parents 367156d + 83f0616 commit 73c7232

File tree

3 files changed

+129
-10
lines changed

3 files changed

+129
-10
lines changed

pkg/controller/chartrepo.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/alauda/captain/pkg/util"
1010
"github.com/alauda/helm-crds/pkg/apis/app/v1alpha1"
1111
"helm.sh/helm/pkg/repo"
12+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1213
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/apimachinery/pkg/runtime/schema"
1415
"k8s.io/apimachinery/pkg/types"
@@ -131,7 +132,9 @@ func (c *Controller) createCharts(cr *v1alpha1.ChartRepo) error {
131132
klog.Infof("chart %s/%s not found, create", cr.GetName(), name)
132133
_, err = c.appClientSet.AppV1alpha1().Charts(cr.GetNamespace()).Create(chart)
133134
if err != nil {
134-
return err
135+
if !apierrors.IsAlreadyExists(err) {
136+
return err
137+
}
135138
}
136139
continue
137140
}

pkg/controller/controller.go

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type Controller struct {
8484
helmRequestSynced cache.InformerSynced
8585

8686
chartRepoSynced cache.InformerSynced
87+
chartRepoLister listers.ChartRepoLister
8788

8889
// ClusterCache is used to store Cluster resource
8990
ClusterCache *commoncache.Cache
@@ -93,7 +94,8 @@ type Controller struct {
9394
// means we can ensure we only process a fixed amount of resources at a
9495
// time, and makes it easy to ensure we are never processing the same item
9596
// simultaneously in two different workers.
96-
workQueue workqueue.RateLimitingInterface
97+
workQueue workqueue.RateLimitingInterface
98+
chartRepoWorkQueue workqueue.RateLimitingInterface
9799
// recorder is an event recorder for recording Event resources to the
98100
// Kubernetes API.
99101
recorder record.EventRecorder
@@ -132,12 +134,14 @@ func NewController(mgr manager.Manager, opt *config.Options, stopCh <-chan struc
132134
clusterClient: clusterClient,
133135
globalClusterName: opt.GlobalClusterName,
134136
},
135-
restConfig: cfg,
136-
recorder: mgr.GetEventRecorderFor(util.ComponentName),
137-
helmRequestLister: informer.Lister(),
138-
helmRequestSynced: informer.Informer().HasSynced,
139-
chartRepoSynced: repoInformer.Informer().HasSynced,
140-
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "HelmRequests"),
137+
restConfig: cfg,
138+
recorder: mgr.GetEventRecorderFor(util.ComponentName),
139+
helmRequestLister: informer.Lister(),
140+
chartRepoLister: repoInformer.Lister(),
141+
helmRequestSynced: informer.Informer().HasSynced,
142+
chartRepoSynced: repoInformer.Informer().HasSynced,
143+
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "HelmRequests"),
144+
chartRepoWorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRepos"),
141145
// refresh frequently
142146
ClusterCache: commoncache.New(1*time.Minute, 5*time.Minute),
143147
}
@@ -173,6 +177,7 @@ func (c *Controller) GetClusterClient() clusterclientset.Interface {
173177
func (c *Controller) Start(stopCh <-chan struct{}) error {
174178
defer utilruntime.HandleCrash()
175179
defer c.workQueue.ShutDown()
180+
defer c.chartRepoWorkQueue.ShutDown()
176181

177182
// Start the informer factories to begin populating the informer caches
178183
klog.Info("Starting HelmRequest controller")
@@ -187,6 +192,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
187192
// Launch two workers to process HelmRequest resources
188193
for i := 0; i < 2; i++ {
189194
go wait.Until(c.runWorker, time.Second, stopCh)
195+
go wait.Until(c.runChartRepoWorker, time.Second, stopCh)
190196
}
191197

192198
klog.Info("Started workers")
@@ -204,6 +210,114 @@ func (c *Controller) runWorker() {
204210
}
205211
}
206212

213+
func (c *Controller) runChartRepoWorker() {
214+
for c.processNextChartRepo() {
215+
}
216+
}
217+
218+
// processNextWorkItem will read a single work item off the chartRepoWorkQueue and
219+
// attempt to process it, by calling the syncHandler.
220+
func (c *Controller) processNextChartRepo() bool {
221+
obj, shutdown := c.chartRepoWorkQueue.Get()
222+
223+
if shutdown {
224+
return false
225+
}
226+
227+
// We wrap this block in a func so we can defer c.chartRepoWorkQueue.Done.
228+
err := func(obj interface{}) error {
229+
// We call Done here so the chartRepoWorkQueue knows we have finished
230+
// processing this item. We also must remember to call Forget if we
231+
// do not want this work item being re-queued. For example, we do
232+
// not call Forget if a transient error occurs, instead the item is
233+
// put back on the chartRepoWorkQueue and attempted again after a back-off
234+
// period.
235+
defer c.chartRepoWorkQueue.Done(obj)
236+
var key string
237+
var ok bool
238+
// We expect strings to come off the chartRepoWorkQueue. These are of the
239+
// form namespace/name. We do this as the delayed nature of the
240+
// chartRepoWorkQueue means the items in the informer cache may actually be
241+
// more up to date that when the item was initially put onto the
242+
// chartRepoWorkQueue.
243+
if key, ok = obj.(string); !ok {
244+
// As the item in the chartRepoWorkQueue is actually invalid, we call
245+
// Forget here else we'd go into a loop of attempting to
246+
// process a work item that is invalid.
247+
c.chartRepoWorkQueue.Forget(obj)
248+
utilruntime.HandleError(fmt.Errorf("expected string in chartRepoWorkQueue but got %#v", obj))
249+
return nil
250+
}
251+
// Start the syncHandler, passing it the namespace/name string of the
252+
// HelmRequest resource to be synced.
253+
if err := c.syncChartRepoHandler(key); err != nil {
254+
// Put the item back on the chartRepoWorkQueue to handle any transient errors.
255+
c.chartRepoWorkQueue.AddRateLimited(key)
256+
return fmt.Errorf("error syncing chartrepo '%s': %s, requeuing", key, err.Error())
257+
}
258+
// Finally, if no error occurs we Forget this item so it does not
259+
// get queued again until another change happens.
260+
c.chartRepoWorkQueue.Forget(obj)
261+
klog.Infof("Successfully synced '%s'", key)
262+
return nil
263+
}(obj)
264+
265+
if err != nil {
266+
utilruntime.HandleError(err)
267+
return true
268+
}
269+
270+
return true
271+
}
272+
273+
// syncHandler compares the actual state with the desired, and attempts to
274+
// converge the two. It then updates the Status block of the HelmRequest resource
275+
// with the current status of the resource.
276+
func (c *Controller) syncChartRepoHandler(key string) error {
277+
// Convert the namespace/name string into a distinct namespace and name
278+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
279+
if err != nil {
280+
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
281+
return nil
282+
}
283+
klog.V(9).Infof("")
284+
285+
// Get the HelmRequest resource with this namespace/name
286+
chartRepo, err := c.chartRepoLister.ChartRepos(namespace).Get(name)
287+
if err != nil {
288+
// The HelmRequest resource may no longer exist, in which case we stop
289+
// processing.
290+
if errors.IsNotFound(err) {
291+
utilruntime.HandleError(fmt.Errorf("repo '%s' in work queue no longer exists", key))
292+
return nil
293+
}
294+
295+
return err
296+
}
297+
298+
if !chartRepo.DeletionTimestamp.IsZero() {
299+
klog.Infof("ChartRepo has not nil DeletionTimestamp, starting to delete it: %s", chartRepo.Name)
300+
return nil
301+
}
302+
303+
c.syncChartRepo(chartRepo)
304+
305+
return nil
306+
}
307+
308+
// enqueueHelmRequest takes a HelmRequest resource and converts it into a namespace/name
309+
// string which is then put onto the work queue. This method should *not* be
310+
// passed resources of any type other than HelmRequest.
311+
func (c *Controller) enqueueChartRepo(obj interface{}) {
312+
var key string
313+
var err error
314+
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
315+
utilruntime.HandleError(err)
316+
return
317+
}
318+
c.chartRepoWorkQueue.Add(key)
319+
}
320+
207321
// processNextWorkItem will read a single work item off the workQueue and
208322
// attempt to process it, by calling the syncHandler.
209323
func (c *Controller) processNextWorkItem() bool {

pkg/controller/eventhandler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ func (c *Controller) newChartRepoHandler() cache.ResourceEventHandler {
2020
klog.V(2).Info("receive new chartrepo")
2121

2222
// TODO: use a queue
23-
go c.syncChartRepo(new)
23+
//go c.syncChartRepo(new)
24+
25+
c.enqueueChartRepo(new)
2426

2527
//if oldChartRepo.Spec.Secret != nil && newChartRepo.Spec.Secret != nil {
2628
// if oldChartRepo.Spec.Secret.Name != newChartRepo.Spec.Secret.Name {
@@ -45,7 +47,7 @@ func (c *Controller) newChartRepoHandler() cache.ResourceEventHandler {
4547
}
4648

4749
funcs := cache.ResourceEventHandlerFuncs{
48-
AddFunc: c.syncChartRepo,
50+
AddFunc: c.enqueueChartRepo,
4951
UpdateFunc: updateFunc,
5052
DeleteFunc: deleteFunc,
5153
}

0 commit comments

Comments
 (0)