Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create coredns service for tenants #29

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 91 additions & 52 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"encoding/base64"
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"time"

Expand All @@ -36,8 +34,9 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
typedrbacv1 "k8s.io/client-go/kubernetes/typed/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand All @@ -46,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/apis/rbac"
rbacv1helpers "k8s.io/kubernetes/pkg/apis/rbac/v1"

tenantv1a1 "github.com/kubewharf/kubezoo/pkg/apis/tenant/v1alpha1"
"github.com/kubewharf/kubezoo/pkg/dynamic"
tenantclient "github.com/kubewharf/kubezoo/pkg/generated/clientset/versioned/typed/tenant/v1alpha1"
tenantlister "github.com/kubewharf/kubezoo/pkg/generated/listers/tenant/v1alpha1"
Expand Down Expand Up @@ -80,66 +80,71 @@ type TenantController struct {
tenantInformer cache.SharedIndexInformer
tenantLister tenantlister.TenantLister
tenantClient tenantclient.TenantV1alpha1Interface
upstreamCoreClient v1.CoreV1Interface
upstreamRbacClient rbacclient.RbacV1Interface
upstreamAppsClient typedappsv1.AppsV1Interface
upstreamCoreClient typedcorev1.CoreV1Interface
upstreamRbacClient typedrbacv1.RbacV1Interface
upstreamDiscoveryClient *discovery.DiscoveryClient
upstreamDynamicClient dynamic.Interface
upstreamCRDClient *apiextensions.Clientset
clientCAFile string
clientCAKeyFile string
kubeZooHostAddress string
kubeZooBindAddress string
kubeZooSecurePort int
}

// newTenantController create a controller to handler the events of tenant.
func newTenantController(ti cache.SharedIndexInformer, tenantCli tenantclient.TenantV1alpha1Interface, coreCli v1.CoreV1Interface, rbacCli rbacclient.RbacV1Interface, discoveryCli *discovery.DiscoveryClient, dynamicCli dynamic.Interface, crdClient *apiextensions.Clientset, clientCAFile, clientCAKeyFile, kubeZooBindAddress string, kubeZooSecurePort int) *TenantController {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var (
newEvent Event
err error
)
ti.AddEventHandler(cache.ResourceEventHandlerFuncs{
func setupEventHandler(tc *TenantController) {
tc.tenantInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
newEvent.tenantId, err = cache.MetaNamespaceKeyFunc(obj)
tenantId, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
newEvent.eventType = Add
queue.Add(newEvent)
tc.queue.Add(Event{tenantId: tenantId, eventType: Add})
}
},
UpdateFunc: func(_, new interface{}) {
newEvent.tenantId, err = cache.MetaNamespaceKeyFunc(new)
tenantId, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
newEvent.eventType = Update
queue.Add(newEvent)
tc.queue.Add(Event{tenantId: tenantId, eventType: Update})
}
},
DeleteFunc: func(obj interface{}) {
newEvent.tenantId, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
tenantId, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
newEvent.eventType = Delete
queue.Add(newEvent)
tc.queue.Add(Event{tenantId: tenantId, eventType: Delete})
}
},
})
}

return &TenantController{
queue: queue,
// Run starts the tenant controller
func Run(
stopCh <-chan struct{},
ti cache.SharedIndexInformer,
tenantClient tenantclient.TenantV1alpha1Interface,
k8sClient kubernetes.Interface,
discoveryClient *discovery.DiscoveryClient,
dynamicClient dynamic.Interface,
crdClient *apiextensions.Clientset,
clientCAFile, clientCAKeyFile string,
kubeZooBindAddress string, kubeZooSecurePort int,
) {
tc := &TenantController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
tenantInformer: ti,
tenantLister: tenantlister.NewTenantLister(ti.GetIndexer()),
tenantClient: tenantCli,
upstreamCoreClient: coreCli,
upstreamRbacClient: rbacCli,
upstreamDiscoveryClient: discoveryCli,
upstreamDynamicClient: dynamicCli,
tenantClient: tenantClient,
upstreamCoreClient: k8sClient.CoreV1(),
upstreamAppsClient: k8sClient.AppsV1(),
upstreamRbacClient: k8sClient.RbacV1(),
upstreamDiscoveryClient: discoveryClient,
upstreamDynamicClient: dynamicClient,
upstreamCRDClient: crdClient,
clientCAFile: clientCAFile,
clientCAKeyFile: clientCAKeyFile,
kubeZooHostAddress: net.JoinHostPort(kubeZooBindAddress, strconv.Itoa(kubeZooSecurePort)),
kubeZooBindAddress: kubeZooBindAddress,
kubeZooSecurePort: kubeZooSecurePort,
}
}
setupEventHandler(tc)

// Run starts the tenant controller
func Run(stopCh <-chan struct{}, ti cache.SharedIndexInformer, tenantCli tenantclient.TenantV1alpha1Interface, typedCli kubernetes.Interface, discoveryCli *discovery.DiscoveryClient, dynamicCli dynamic.Interface, crdClient *apiextensions.Clientset, clientCAFile, clientCAKeyFile, kubeZooBindAddress string, kubeZooSecurePort int) {
tc := newTenantController(ti, tenantCli, typedCli.CoreV1(), typedCli.RbacV1(), discoveryCli, dynamicCli, crdClient, clientCAFile, clientCAKeyFile, kubeZooBindAddress, kubeZooSecurePort)
defer utilruntime.HandleCrash()
defer tc.queue.ShutDown()

Expand Down Expand Up @@ -171,13 +176,15 @@ func (tc *TenantController) runWorker() {

// processNextItem gets event from queue and process it.
func (tc *TenantController) processNextItem() bool {
ctx := context.TODO()

newEvent, quit := tc.queue.Get()

if quit {
return false
}
defer tc.queue.Done(newEvent)
err := tc.processItem(newEvent.(Event))
err := tc.processItem(ctx, newEvent.(Event))
if err == nil {
// No error, reset the ratelimit counters
tc.queue.Forget(newEvent)
Expand All @@ -195,11 +202,11 @@ func (tc *TenantController) processNextItem() bool {
}

// processItem processes the event according the event type.
func (tc *TenantController) processItem(e Event) error {
func (tc *TenantController) processItem(ctx context.Context, e Event) error {
// process events based on its type
switch e.eventType {
case Add, Update:
return tc.onTenantAddOrUpdate(e.tenantId)
return tc.onTenantAddOrUpdate(ctx, e.tenantId)
case Delete:
klog.Warningf("deleting tenant %v", e.tenantId)
return nil
Expand All @@ -208,7 +215,7 @@ func (tc *TenantController) processItem(e Event) error {
}

// onTenantAddOrUpdate handles the ADD or UPDATE event of a Tenant.
func (tc *TenantController) onTenantAddOrUpdate(tenantId string) error {
func (tc *TenantController) onTenantAddOrUpdate(ctx context.Context, tenantId string) error {
tenant, err := tc.tenantClient.Tenants().Get(context.TODO(), tenantId, metav1.GetOptions{})
if err != nil {
return err
Expand Down Expand Up @@ -237,7 +244,7 @@ func (tc *TenantController) onTenantAddOrUpdate(tenantId string) error {
return nil
}

return tc.syncResources(tenantId)
return tc.syncResources(ctx, tenantId, tc.kubeZooBindAddress, tc.kubeZooSecurePort)
}

// deleteResources deletes resources belonging to the tenant from the upstream cluster.
Expand Down Expand Up @@ -354,29 +361,37 @@ func (tc *TenantController) filterCRDs(clusterScopedAPIResources []metav1.APIRes
}

// syncResources sync system resources to the upstream cluster when new tenant is being created.
func (tc *TenantController) syncResources(tenantId string) error {
func (tc *TenantController) syncResources(ctx context.Context, tenantId string, zooHost string, zooPort int) error {
tenant, err := tc.tenantLister.Get(tenantId)
if err != nil {
return fmt.Errorf("get tenant from lister: %w", err)
}

if tc.tenantClient == nil || tc.upstreamCoreClient == nil || tc.upstreamRbacClient == nil {
return errors.New("Skip synchronize namespaces or RBAC resources since nil client.")
}

klog.V(4).Infof("Sync system resources for tenant %s", tenantId)
if err := syncNamespaces(tc.upstreamCoreClient, tenantId); err != nil {
return err
return fmt.Errorf("sync namespace: %w", err)
}
if err := syncClusterRoles(tc.upstreamCoreClient, tc.upstreamRbacClient, tenantId); err != nil {
return err
return fmt.Errorf("sync cluster role: %w", err)
}
if err := syncClusterRoleBindings(tc.upstreamCoreClient, tc.upstreamRbacClient, tenantId); err != nil {
return err
return fmt.Errorf("sync cluster role binding: %w", err)
}
if err := genCertAndKubeconfig(tc.tenantClient, tenantId, tc.tenantLister, tc.clientCAFile, tc.clientCAKeyFile, tc.kubeZooHostAddress); err != nil {
return err
if err := syncTenantStack(ctx, tc.upstreamCoreClient, tc.upstreamAppsClient, tenant, zooHost, zooPort); err != nil {
return fmt.Errorf("sync tenant stack: %w", err)
}
if err := genCertAndKubeconfig(tc.tenantClient, tenantId, tc.tenantLister, tc.clientCAFile, tc.clientCAKeyFile, tc.kubeZooBindAddress, tc.kubeZooSecurePort); err != nil {
return fmt.Errorf("gen cert and kubeconfig: %w", err)
}
return nil
}

// syncNamespaces synchronize the system namespaces to upstream cluster.
func syncNamespaces(coreClient v1.CoreV1Interface, tenantId string) error {
func syncNamespaces(coreClient typedcorev1.CoreV1Interface, tenantId string) error {
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease, corev1.NamespaceDefault}

for _, systemNamespace := range systemNamespaces {
Expand All @@ -401,7 +416,7 @@ func syncNamespaces(coreClient v1.CoreV1Interface, tenantId string) error {
}

// syncClusterRoles synchronize the cluster roles to upstream cluster.
func syncClusterRoles(coreClient v1.CoreV1Interface, rbacClient rbacclient.RbacV1Interface, tenantId string) error {
func syncClusterRoles(coreClient typedcorev1.CoreV1Interface, rbacClient typedrbacv1.RbacV1Interface, tenantId string) error {
if _, err := rbacClient.ClusterRoles().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}); err != nil {
klog.Warningf("Failed to list the clusterroles %s with error %v", tenantId, err)
return err
Expand Down Expand Up @@ -458,7 +473,7 @@ func syncClusterRoles(coreClient v1.CoreV1Interface, rbacClient rbacclient.RbacV
}

// syncClusterRoleBindings synchronize the clusterrolebindings to upstream cluster.
func syncClusterRoleBindings(coreClient v1.CoreV1Interface, rbacClient rbacclient.RbacV1Interface, tenantId string) error {
func syncClusterRoleBindings(coreClient typedcorev1.CoreV1Interface, rbacClient typedrbacv1.RbacV1Interface, tenantId string) error {
if _, err := rbacClient.ClusterRoleBindings().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"}); err != nil {
klog.Warningf("Failed to list the clusterrolebindings %s with error %v", tenantId, err)
return err
Expand Down Expand Up @@ -500,9 +515,31 @@ func syncClusterRoleBindings(coreClient v1.CoreV1Interface, rbacClient rbacclien
return nil
}

// syncTenantStack syncs the tenant service stack, including coredns.
func syncTenantStack(
ctx context.Context,
coreClient typedcorev1.CoreV1Interface,
appsClient typedappsv1.AppsV1Interface,
tenant *tenantv1a1.Tenant,
zooHost string,
zooPort int,
) error {
if err := syncCoredns(ctx, coreClient, appsClient, tenant, zooHost, zooPort); err != nil {
return fmt.Errorf("sync coredns: %w", err)
}

return nil
}

// genCertAndKubeconfig signs the certificate/key and generates the kubeconfig for the tenant;
// the generated kubeconfig will be attached in the tenant's annotation.
func genCertAndKubeconfig(tenantCli tenantclient.TenantV1alpha1Interface, tenantId string, tenantlister tenantlister.TenantLister, clientCAFile, clientCAKeyFile, kubeZooHostAddress string) error {
func genCertAndKubeconfig(
tenantCli tenantclient.TenantV1alpha1Interface,
tenantId string,
tenantlister tenantlister.TenantLister,
clientCAFile, clientCAKeyFile string,
kubeZooBindAddress string, kubeZooSecurePort int,
) error {
tenant, err := tenantlister.Get(tenantId)
if err != nil {
return errors.Errorf("Error fetching object with key %s from store: %v", tenantId, err)
Expand All @@ -525,7 +562,9 @@ func genCertAndKubeconfig(tenantCli tenantclient.TenantV1alpha1Interface, tenant
klog.Warningf("fail to read CA from file(%s): %v", clientCAFile, err)
return err
}
kbcfgByts, err := util.GenKubeconfig("https://"+kubeZooHostAddress, tenantId, caCertByts, util.EncodePrivateKeyPEM(key), util.EncodeCertPEM(cert))

serverAddress := fmt.Sprintf("https://%s:%d", kubeZooBindAddress, kubeZooSecurePort)
kbcfgByts, err := util.GenKubeconfig(serverAddress, tenantId, caCertByts, util.EncodePrivateKeyPEM(key), util.EncodeCertPEM(cert))
if err != nil {
klog.Warningf("fail to generate the kubeconfig for tenant(%s): %v", tenantId, err)
return err
Expand Down
Loading