Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
controller: use a specific enqueue strategy (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienBalestra authored and Joseph-Irving committed Jan 22, 2020
1 parent df81d32 commit bc33c2a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 9 deletions.
100 changes: 91 additions & 9 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"context"

"github.com/uswitch/nidhogg/pkg/nidhogg"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand All @@ -41,24 +43,105 @@ func newReconciler(mgr manager.Manager, cfg nidhogg.HandlerConfig) reconcile.Rec
return &ReconcileNode{handler: nidhogg.NewHandler(mgr.GetClient(), mgr.GetRecorder("nidhogg"), cfg), scheme: mgr.GetScheme()}
}

type nodeEnqueue struct{}

// Update implements the interface
func (e *nodeEnqueue) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {}

// Delete implements the interface
func (e *nodeEnqueue) Delete(_ event.DeleteEvent, _ workqueue.RateLimitingInterface) {}

// Generic implements the interface
func (e *nodeEnqueue) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {}

// Create adds the node to the queue, the node is created as NotReady and without daemonset pods
func (e *nodeEnqueue) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
}})
}

type podEnqueue struct{}

// Generic implements the interface
func (e *podEnqueue) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {}

// canAddToQueue check if the Pod is associated to a node and is a daemonset pod
func (e *podEnqueue) canAddToQueue(pod *corev1.Pod) bool {
if pod.Spec.NodeName == "" {
return false
}
owner := v1.GetControllerOf(pod)
if owner == nil {
return false
}
return owner.Kind == "DaemonSet"
}

// Create adds the node of the daemonset pod to the queue
func (e *podEnqueue) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
pod, ok := evt.Object.(*corev1.Pod)
if !ok {
return
}
if !e.canAddToQueue(pod) {
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: pod.Spec.NodeName,
}})

}

// Update adds the node of the updated daemonset pod to the queue
func (e *podEnqueue) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
pod, ok := evt.ObjectNew.(*corev1.Pod)
if !ok {
return
}
if !e.canAddToQueue(pod) {
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: pod.Spec.NodeName,
}})
}

// Delete adds the node of the deleted daemonset pod to the queue
func (e *podEnqueue) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
pod, ok := evt.Object.(*corev1.Pod)
if !ok {
return
}
if !e.canAddToQueue(pod) {
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: pod.Spec.NodeName,
}})
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New("node-controller", mgr, controller.Options{Reconciler: r})
c, err := controller.New("node-controller", mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: 1,
})
if err != nil {
return err
}

// Watch for changes to Node
err = c.Watch(&source.Kind{Type: &corev1.Node{}}, &handler.EnqueueRequestForObject{})
err = c.Watch(&source.Kind{Type: &corev1.Node{}}, &nodeEnqueue{})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
IsController: false,
OwnerType: &appsv1.DaemonSet{},
})
err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &podEnqueue{})
if err != nil {
return err
}
Expand Down Expand Up @@ -93,6 +176,5 @@ func (r *ReconcileNode) Reconcile(request reconcile.Request) (reconcile.Result,
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}

return r.handler.HandleNode(instance)
}
2 changes: 2 additions & 0 deletions pkg/nidhogg/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (h *Handler) HandleNode(instance *corev1.Node) (reconcile.Result, error) {
} else {
firstTimeReady = nodeCopy.Annotations[annotationFirstTimeReady]
}
} else if copy.Annotations != nil {
firstTimeReady = copy.Annotations[annotationFirstTimeReady]
}

if !reflect.DeepEqual(nodeCopy, instance) {
Expand Down

0 comments on commit bc33c2a

Please sign in to comment.