Skip to content
This repository has been archived by the owner on Apr 25, 2024. It is now read-only.

Commit

Permalink
Merge pull request #131 from FabianKramm/master
Browse files Browse the repository at this point in the history
feat: add watch support for spaces & accounts
  • Loading branch information
FabianKramm committed Jun 16, 2021
2 parents 97db1f8 + 4df3e26 commit 16cb763
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 11 deletions.
12 changes: 12 additions & 0 deletions cmd/kiosk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"github.com/loft-sh/kiosk/pkg/util/certhelper"
"github.com/loft-sh/kiosk/pkg/util/log"
"github.com/loft-sh/kiosk/pkg/util/secret"
"github.com/loft-sh/kiosk/pkg/watch"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/klog"
Expand Down Expand Up @@ -171,6 +173,16 @@ func main() {
// Make sure the manager is synced
mgr.GetCache().WaitForCacheSync(ctx)

// Start watch registries
err = watch.NamespaceRegistry.Start(ctx, mgr.GetCache(), &corev1.Namespace{})
if err != nil {
klog.Fatalf("start namespace informer: %v", err)
}
err = watch.AccountRegistry.Start(ctx, mgr.GetCache(), &configv1alpha1.Account{})
if err != nil {
klog.Fatalf("start account informer: %v", err)
}

// Start the api server
go func() {
if os.Getenv("SERVER_SIDE_APPLY_ENABLED") != "true" {
Expand Down
12 changes: 5 additions & 7 deletions devspace.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: v1beta7
version: v1beta10
images:
controller:
image: kiosksh/kiosktest
Expand Down Expand Up @@ -44,14 +44,12 @@ profiles:
path: deployments[0].helm.values.kiosk.resources
value:
limits:
memory: 100Gi
cpu: "1000"
memory: 4Gi
cpu: "4"
replace:
dev:
interactive:
defaultEnabled: true
terminal:
imageName: controller
terminal:
imageName: controller
sync:
- imageName: controller
excludePaths:
Expand Down
36 changes: 32 additions & 4 deletions pkg/apiserver/registry/account/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,35 @@ import (
config "github.com/loft-sh/kiosk/pkg/apis/config/v1alpha1"
"github.com/loft-sh/kiosk/pkg/apis/tenancy"
"github.com/loft-sh/kiosk/pkg/authorization"
kioskwatch "github.com/loft-sh/kiosk/pkg/watch"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

type accountREST struct {
client client.Client
filter authorization.FilteredLister
client client.Client
filter authorization.FilteredLister
authorizer authorizer.Authorizer
}

// NewAccountREST creates a new account storage that implements the rest interface
func NewAccountREST(cachedClient client.Client, uncachedClient client.Client, scheme *runtime.Scheme) rest.Storage {
ruleClient := authorization.NewRuleClient(cachedClient)
authorizer := rbac.New(ruleClient, ruleClient, ruleClient, ruleClient)
return &accountREST{
client: uncachedClient,
filter: authorization.NewFilteredLister(uncachedClient, rbac.New(ruleClient, ruleClient, ruleClient, ruleClient)),
client: uncachedClient,
authorizer: authorizer,
filter: authorization.NewFilteredLister(uncachedClient, authorizer),
}
}

Expand Down Expand Up @@ -135,6 +142,27 @@ func (r *accountREST) List(ctx context.Context, options *metainternalversion.Lis
return accountList, nil
}

var _ = rest.Watcher(&accountREST{})

func (r *accountREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
userInfo, ok := request.UserFrom(ctx)
if !ok {
return nil, fmt.Errorf("user is missing in context")
}
if options == nil {
options = &metainternalversion.ListOptions{}
}

w := &watcher{
userInfo: userInfo,
labelSelector: options.LabelSelector,
authorizer: r.authorizer,
result: make(chan watch.Event),
}
kioskwatch.AccountRegistry.Subscribe(w)
return w, nil
}

var _ = rest.Getter(&accountREST{})

func (r *accountREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
Expand Down
79 changes: 79 additions & 0 deletions pkg/apiserver/registry/account/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package account

import (
"context"
configv1alpha1 "github.com/loft-sh/kiosk/pkg/apis/config/v1alpha1"
"github.com/loft-sh/kiosk/pkg/apis/tenancy"
kioskwatch "github.com/loft-sh/kiosk/pkg/watch"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/klog"
"sync"
)

var _ kioskwatch.Watcher = &watcher{}

type watcher struct {
userInfo user.Info
labelSelector labels.Selector
authorizer authorizer.Authorizer

result chan watch.Event
stopped bool
sync.RWMutex
}

func (w *watcher) Observe(event watch.Event) {
configAccount, ok := event.Object.(*configv1alpha1.Account)
if !ok {
return
}

tenancyAccount, err := ConvertConfigAccount(configAccount)
if err != nil {
klog.Infof("Error converting config account: %v", err)
return
}

event.Object = tenancyAccount
decision, _, err := w.authorizer.Authorize(context.Background(), authorizer.AttributesRecord{
User: w.userInfo,
ResourceRequest: true,
Verb: "get",
APIGroup: tenancy.SchemeGroupVersion.Group,
APIVersion: "v1alpha1",
Resource: "accounts",
Name: tenancyAccount.Name,
})
if err != nil || decision != authorizer.DecisionAllow {
return
}

// check label selector
if w.labelSelector != nil && w.labelSelector.Matches(labels.Set(tenancyAccount.Labels)) == false {
return
}

// send event
w.RLock()
defer w.RUnlock()

w.result <- event
}

func (w *watcher) Stop() {
w.Lock()
defer w.Unlock()
if !w.stopped {
klog.Infof("Stop watching accounts for " + w.userInfo.GetName())
kioskwatch.AccountRegistry.Unsubscribe(w)
close(w.result)
w.stopped = true
}
}

func (w *watcher) ResultChan() <-chan watch.Event {
return w.result
}
23 changes: 23 additions & 0 deletions pkg/apiserver/registry/space/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/loft-sh/kiosk/pkg/authorization"
"github.com/loft-sh/kiosk/pkg/constants"
"github.com/loft-sh/kiosk/pkg/util/loghelper"
kioskwatch "github.com/loft-sh/kiosk/pkg/watch"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -36,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/user"
authorizer "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/filters"
Expand Down Expand Up @@ -146,6 +148,27 @@ func (r *spaceStorage) List(ctx context.Context, options *metainternalversion.Li
return spaceList, nil
}

var _ = rest.Watcher(&spaceStorage{})

func (r *spaceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
userInfo, ok := request.UserFrom(ctx)
if !ok {
return nil, fmt.Errorf("user is missing in context")
}
if options == nil {
options = &metainternalversion.ListOptions{}
}

w := &watcher{
userInfo: userInfo,
labelSelector: options.LabelSelector,
authorizer: r.authorizer,
result: make(chan watch.Event),
}
kioskwatch.NamespaceRegistry.Subscribe(w)
return w, nil
}

var _ = rest.Getter(&spaceStorage{})

func (r *spaceStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
Expand Down
74 changes: 74 additions & 0 deletions pkg/apiserver/registry/space/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package space

import (
"context"
kioskwatch "github.com/loft-sh/kiosk/pkg/watch"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/klog"
"sync"
)

var _ kioskwatch.Watcher = &watcher{}

type watcher struct {
userInfo user.Info
labelSelector labels.Selector
authorizer authorizer.Authorizer

result chan watch.Event
stopped bool
sync.RWMutex
}

func (w *watcher) Observe(event watch.Event) {
namespace, ok := event.Object.(*corev1.Namespace)
if !ok {
return
}

space := ConvertNamespace(namespace)
event.Object = space
decision, _, err := w.authorizer.Authorize(context.Background(), authorizer.AttributesRecord{
User: w.userInfo,
ResourceRequest: true,
Verb: "get",
APIGroup: corev1.SchemeGroupVersion.Group,
APIVersion: corev1.SchemeGroupVersion.Version,
Resource: "namespaces",
Namespace: space.Name,
Name: space.Name,
})
if err != nil || decision != authorizer.DecisionAllow {
return
}

// check label selector
if w.labelSelector != nil && w.labelSelector.Matches(labels.Set(space.Labels)) == false {
return
}

// send event
w.RLock()
defer w.RUnlock()

w.result <- event
}

func (w *watcher) Stop() {
w.Lock()
defer w.Unlock()
if !w.stopped {
klog.Infof("Stop watching spaces for " + w.userInfo.GetName())
kioskwatch.NamespaceRegistry.Unsubscribe(w)
close(w.result)
w.stopped = true
}
}

func (w *watcher) ResultChan() <-chan watch.Event {
return w.result
}
Loading

0 comments on commit 16cb763

Please sign in to comment.