-
Notifications
You must be signed in to change notification settings - Fork 351
/
Copy pathmain.go
125 lines (109 loc) · 3.98 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"os"
"os/signal"
"syscall"
log "github.com/Sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
myresourceclientset "github.com/trstringer/k8s-controller-custom-resource/pkg/client/clientset/versioned"
myresourceinformer_v1 "github.com/trstringer/k8s-controller-custom-resource/pkg/client/informers/externalversions/myresource/v1"
)
// retrieve the Kubernetes cluster client from outside of the cluster
func getKubernetesClient() (kubernetes.Interface, myresourceclientset.Interface) {
// construct the path to resolve to `~/.kube/config`
kubeConfigPath := os.Getenv("HOME") + "/.kube/config"
// create the config from the path
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
// generate the client based off of the config
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
myresourceClient, err := myresourceclientset.NewForConfig(config)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
log.Info("Successfully constructed k8s client")
return client, myresourceClient
}
// main code path
func main() {
// get the Kubernetes client for connectivity
client, myresourceClient := getKubernetesClient()
// retrieve our custom resource informer which was generated from
// the code generator and pass it the custom resource client, specifying
// we should be looking through all namespaces for listing and watching
informer := myresourceinformer_v1.NewMyResourceInformer(
myresourceClient,
meta_v1.NamespaceAll,
0,
cache.Indexers{},
)
// create a new queue so that when the informer gets a resource that is either
// a result of listing or watching, we can add an idenfitying key to the queue
// so that it can be handled in the handler
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// add event handlers to handle the three types of events for resources:
// - adding new resources
// - updating existing resources
// - deleting resources
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// convert the resource object into a key (in this case
// we are just doing it in the format of 'namespace/name')
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Infof("Add myresource: %s", key)
if err == nil {
// add the key to the queue for the handler to get
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Update myresource: %s", key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamsespaceKeyFunc is a helper function that allows
// us to check the DeletedFinalStateUnknown existence in the event that
// a resource was deleted but it is still contained in the index
//
// this then in turn calls MetaNamespaceKeyFunc
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Delete myresource: %s", key)
if err == nil {
queue.Add(key)
}
},
})
// construct the Controller object which has all of the necessary components to
// handle logging, connections, informing (listing and watching), the queue,
// and the handler
controller := Controller{
logger: log.NewEntry(log.New()),
clientset: client,
informer: informer,
queue: queue,
handler: &TestHandler{},
}
// use a channel to synchronize the finalization for a graceful shutdown
stopCh := make(chan struct{})
defer close(stopCh)
// run the controller loop to process items
go controller.Run(stopCh)
// use a channel to handle OS signals to terminate and gracefully shut
// down processing
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM)
signal.Notify(sigTerm, syscall.SIGINT)
<-sigTerm
}