diff --git a/cmd/kiosk/main.go b/cmd/kiosk/main.go index fbb9bfce..f241bf31 100644 --- a/cmd/kiosk/main.go +++ b/cmd/kiosk/main.go @@ -34,7 +34,9 @@ import ( "github.com/loft-sh/kiosk/pkg/util/certhelper" "github.com/loft-sh/kiosk/pkg/util/log" "github.com/loft-sh/kiosk/pkg/util/secret" + "github.com/loft-sh/kiosk/pkg/watch" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/klog" @@ -171,6 +173,16 @@ func main() { // Make sure the manager is synced mgr.GetCache().WaitForCacheSync(ctx) + // Start watch registries + err = watch.NamespaceRegistry.Start(ctx, mgr.GetCache(), &corev1.Namespace{}) + if err != nil { + klog.Fatalf("start namespace informer: %v", err) + } + err = watch.AccountRegistry.Start(ctx, mgr.GetCache(), &configv1alpha1.Account{}) + if err != nil { + klog.Fatalf("start account informer: %v", err) + } + // Start the api server go func() { if os.Getenv("SERVER_SIDE_APPLY_ENABLED") != "true" { diff --git a/devspace.yaml b/devspace.yaml index 05037e0b..c9bfd733 100644 --- a/devspace.yaml +++ b/devspace.yaml @@ -1,4 +1,4 @@ -version: v1beta7 +version: v1beta10 images: controller: image: kiosksh/kiosktest @@ -44,14 +44,12 @@ profiles: path: deployments[0].helm.values.kiosk.resources value: limits: - memory: 100Gi - cpu: "1000" + memory: 4Gi + cpu: "4" replace: dev: - interactive: - defaultEnabled: true - terminal: - imageName: controller + terminal: + imageName: controller sync: - imageName: controller excludePaths: diff --git a/pkg/apiserver/registry/account/rest.go b/pkg/apiserver/registry/account/rest.go index 12642a20..f4f0d7ae 100644 --- a/pkg/apiserver/registry/account/rest.go +++ b/pkg/apiserver/registry/account/rest.go @@ -23,28 +23,35 @@ import ( config "github.com/loft-sh/kiosk/pkg/apis/config/v1alpha1" "github.com/loft-sh/kiosk/pkg/apis/tenancy" "github.com/loft-sh/kiosk/pkg/authorization" + kioskwatch "github.com/loft-sh/kiosk/pkg/watch" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "sigs.k8s.io/controller-runtime/pkg/client" "time" ) type accountREST struct { - client client.Client - filter authorization.FilteredLister + client client.Client + filter authorization.FilteredLister + authorizer authorizer.Authorizer } // NewAccountREST creates a new account storage that implements the rest interface func NewAccountREST(cachedClient client.Client, uncachedClient client.Client, scheme *runtime.Scheme) rest.Storage { ruleClient := authorization.NewRuleClient(cachedClient) + authorizer := rbac.New(ruleClient, ruleClient, ruleClient, ruleClient) return &accountREST{ - client: uncachedClient, - filter: authorization.NewFilteredLister(uncachedClient, rbac.New(ruleClient, ruleClient, ruleClient, ruleClient)), + client: uncachedClient, + authorizer: authorizer, + filter: authorization.NewFilteredLister(uncachedClient, authorizer), } } @@ -135,6 +142,27 @@ func (r *accountREST) List(ctx context.Context, options *metainternalversion.Lis return accountList, nil } +var _ = rest.Watcher(&accountREST{}) + +func (r *accountREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + userInfo, ok := request.UserFrom(ctx) + if !ok { + return nil, fmt.Errorf("user is missing in context") + } + if options == nil { + options = &metainternalversion.ListOptions{} + } + + w := &watcher{ + userInfo: userInfo, + labelSelector: options.LabelSelector, + authorizer: r.authorizer, + result: make(chan watch.Event), + } + kioskwatch.AccountRegistry.Subscribe(w) + return w, nil +} + var _ = rest.Getter(&accountREST{}) func (r *accountREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { diff --git a/pkg/apiserver/registry/account/watcher.go b/pkg/apiserver/registry/account/watcher.go new file mode 100644 index 00000000..b25448a8 --- /dev/null +++ b/pkg/apiserver/registry/account/watcher.go @@ -0,0 +1,79 @@ +package account + +import ( + "context" + configv1alpha1 "github.com/loft-sh/kiosk/pkg/apis/config/v1alpha1" + "github.com/loft-sh/kiosk/pkg/apis/tenancy" + kioskwatch "github.com/loft-sh/kiosk/pkg/watch" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/klog" + "sync" +) + +var _ kioskwatch.Watcher = &watcher{} + +type watcher struct { + userInfo user.Info + labelSelector labels.Selector + authorizer authorizer.Authorizer + + result chan watch.Event + stopped bool + sync.RWMutex +} + +func (w *watcher) Observe(event watch.Event) { + configAccount, ok := event.Object.(*configv1alpha1.Account) + if !ok { + return + } + + tenancyAccount, err := ConvertConfigAccount(configAccount) + if err != nil { + klog.Infof("Error converting config account: %v", err) + return + } + + event.Object = tenancyAccount + decision, _, err := w.authorizer.Authorize(context.Background(), authorizer.AttributesRecord{ + User: w.userInfo, + ResourceRequest: true, + Verb: "get", + APIGroup: tenancy.SchemeGroupVersion.Group, + APIVersion: "v1alpha1", + Resource: "accounts", + Name: tenancyAccount.Name, + }) + if err != nil || decision != authorizer.DecisionAllow { + return + } + + // check label selector + if w.labelSelector != nil && w.labelSelector.Matches(labels.Set(tenancyAccount.Labels)) == false { + return + } + + // send event + w.RLock() + defer w.RUnlock() + + w.result <- event +} + +func (w *watcher) Stop() { + w.Lock() + defer w.Unlock() + if !w.stopped { + klog.Infof("Stop watching accounts for " + w.userInfo.GetName()) + kioskwatch.AccountRegistry.Unsubscribe(w) + close(w.result) + w.stopped = true + } +} + +func (w *watcher) ResultChan() <-chan watch.Event { + return w.result +} diff --git a/pkg/apiserver/registry/space/rest.go b/pkg/apiserver/registry/space/rest.go index 6848a7b6..5eb195cf 100644 --- a/pkg/apiserver/registry/space/rest.go +++ b/pkg/apiserver/registry/space/rest.go @@ -28,6 +28,7 @@ import ( "github.com/loft-sh/kiosk/pkg/authorization" "github.com/loft-sh/kiosk/pkg/constants" "github.com/loft-sh/kiosk/pkg/util/loghelper" + kioskwatch "github.com/loft-sh/kiosk/pkg/watch" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -36,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/user" authorizer "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/filters" @@ -146,6 +148,27 @@ func (r *spaceStorage) List(ctx context.Context, options *metainternalversion.Li return spaceList, nil } +var _ = rest.Watcher(&spaceStorage{}) + +func (r *spaceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + userInfo, ok := request.UserFrom(ctx) + if !ok { + return nil, fmt.Errorf("user is missing in context") + } + if options == nil { + options = &metainternalversion.ListOptions{} + } + + w := &watcher{ + userInfo: userInfo, + labelSelector: options.LabelSelector, + authorizer: r.authorizer, + result: make(chan watch.Event), + } + kioskwatch.NamespaceRegistry.Subscribe(w) + return w, nil +} + var _ = rest.Getter(&spaceStorage{}) func (r *spaceStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { diff --git a/pkg/apiserver/registry/space/watcher.go b/pkg/apiserver/registry/space/watcher.go new file mode 100644 index 00000000..cb8318db --- /dev/null +++ b/pkg/apiserver/registry/space/watcher.go @@ -0,0 +1,74 @@ +package space + +import ( + "context" + kioskwatch "github.com/loft-sh/kiosk/pkg/watch" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/klog" + "sync" +) + +var _ kioskwatch.Watcher = &watcher{} + +type watcher struct { + userInfo user.Info + labelSelector labels.Selector + authorizer authorizer.Authorizer + + result chan watch.Event + stopped bool + sync.RWMutex +} + +func (w *watcher) Observe(event watch.Event) { + namespace, ok := event.Object.(*corev1.Namespace) + if !ok { + return + } + + space := ConvertNamespace(namespace) + event.Object = space + decision, _, err := w.authorizer.Authorize(context.Background(), authorizer.AttributesRecord{ + User: w.userInfo, + ResourceRequest: true, + Verb: "get", + APIGroup: corev1.SchemeGroupVersion.Group, + APIVersion: corev1.SchemeGroupVersion.Version, + Resource: "namespaces", + Namespace: space.Name, + Name: space.Name, + }) + if err != nil || decision != authorizer.DecisionAllow { + return + } + + // check label selector + if w.labelSelector != nil && w.labelSelector.Matches(labels.Set(space.Labels)) == false { + return + } + + // send event + w.RLock() + defer w.RUnlock() + + w.result <- event +} + +func (w *watcher) Stop() { + w.Lock() + defer w.Unlock() + if !w.stopped { + klog.Infof("Stop watching spaces for " + w.userInfo.GetName()) + kioskwatch.NamespaceRegistry.Unsubscribe(w) + close(w.result) + w.stopped = true + } +} + +func (w *watcher) ResultChan() <-chan watch.Event { + return w.result +} diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go new file mode 100644 index 00000000..e2d19ced --- /dev/null +++ b/pkg/watch/watch.go @@ -0,0 +1,121 @@ +package watch + +import ( + "context" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kubecache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sync" +) + +var ( + AccountRegistry = NewRegistry() + NamespaceRegistry = NewRegistry() +) + +type Registry interface { + Subscribe(watcher Watcher) + Unsubscribe(watcher Watcher) + Start(ctx context.Context, cache cache.Cache, obj client.Object) error +} + +type Watcher interface { + Observe(event watch.Event) +} + +func NewRegistry() Registry { + return ®istry{watcher: []Watcher{}} +} + +type registry struct { + watcherMutex sync.RWMutex + watcher []Watcher +} + +func (r *registry) Subscribe(watcher Watcher) { + r.watcherMutex.Lock() + defer r.watcherMutex.Unlock() + + r.watcher = append(r.watcher, watcher) +} + +func (r *registry) Unsubscribe(watcher Watcher) { + r.watcherMutex.Lock() + defer r.watcherMutex.Unlock() + + newWatcher := []Watcher{} + for _, w := range r.watcher { + if w != watcher { + newWatcher = append(newWatcher, w) + } + } + + r.watcher = newWatcher +} + +func (r *registry) OnAdd(obj interface{}) { + runtimeObj, ok := obj.(runtime.Object) + if !ok { + return + } + + r.watcherMutex.RLock() + defer r.watcherMutex.RUnlock() + + for _, w := range r.watcher { + w.Observe(watch.Event{ + Type: watch.Added, + Object: runtimeObj, + }) + } +} + +func (r *registry) OnUpdate(oldObj, newObj interface{}) { + runtimeObj, ok := newObj.(runtime.Object) + if !ok { + return + } + + r.watcherMutex.RLock() + defer r.watcherMutex.RUnlock() + + for _, w := range r.watcher { + w.Observe(watch.Event{ + Type: watch.Modified, + Object: runtimeObj, + }) + } +} + +func (r *registry) OnDelete(obj interface{}) { + if deletedFinalStateUnknown, ok := obj.(kubecache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + + runtimeObj, ok := obj.(runtime.Object) + if !ok { + return + } + + r.watcherMutex.RLock() + defer r.watcherMutex.RUnlock() + + for _, w := range r.watcher { + w.Observe(watch.Event{ + Type: watch.Deleted, + Object: runtimeObj, + }) + } +} + +func (r *registry) Start(ctx context.Context, cache cache.Cache, obj client.Object) error { + inf, err := cache.GetInformer(ctx, obj) + if err != nil { + return err + } + + inf.AddEventHandler(r) + return nil +}