Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion pkg/controller/runtime/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,22 @@ func (cache *ResourceCache) CachePut(r resource.Resource) {

// CacheRemove handles deleted objects.
func (cache *ResourceCache) CacheRemove(r resource.Resource) {
cache.getHandler(r.Metadata().Namespace(), r.Metadata().Type()).remove(r)
cache.CacheRemoveByPointer(r.Metadata())
}

func (cache *ResourceCache) CacheRemoveByPointer(ptr *resource.Metadata) {
// cache.getHandler(ptr.Namespace(), ptr.Type()).remove(ptr)
tombstone := newCacheTombstone(ptr)
cache.getHandler(ptr.Namespace(), ptr.Type()).put(tombstone)
}

// ClearTombstones removes all tombstones from the cache.
//
// TODO: call this periodically in a goroutine, e.g., in the controller runtime.
//
// TODO: only remove tombstones older than X.
func (cache *ResourceCache) ClearTombstones(namespace resource.Namespace, resourceType resource.Type) {
cache.getHandler(namespace, resourceType).clearTombstones()
}

// WrapState returns a cached wrapped state, which serves some operations from the cache bypassing the underlying state.
Expand All @@ -147,3 +162,57 @@ func (cache *ResourceCache) WrapState(st state.CoreState) state.CoreState {
st: st,
}
}

var _ resource.Resource = (*cacheTombstone)(nil)

// cacheTombstone is a resource without a Spec.
//
// Tombstones are used to present state of a deleted resource.
type cacheTombstone struct {
ref resource.Metadata
}

// newCacheTombstone builds a tombstone from resource reference.
func newCacheTombstone(ref resource.Reference) *cacheTombstone {
return &cacheTombstone{
ref: resource.NewMetadata(ref.Namespace(), ref.Type(), ref.ID(), ref.Version()),
}
}

// String method for debugging/logging.
func (t *cacheTombstone) String() string {
return fmt.Sprintf("cacheTombstone(%s)", t.ref.String())
}

// Metadata for the resource.
//
// Metadata.Version should change each time Spec changes.
func (t *cacheTombstone) Metadata() *resource.Metadata {
return &t.ref
}

// Spec is not implemented for tobmstones.
func (t *cacheTombstone) Spec() any {
panic("tombstone doesn't contain spec")
}

// DeepCopy returns self, as tombstone is immutable.
func (t *cacheTombstone) DeepCopy() resource.Resource { //nolint:ireturn
return t
}

// cacheTombstone implements Tombstoned interface.
func (t *cacheTombstone) cacheTombstone() {
}

// Tombstoned is a marker interface for Tombstones.
type cacheTombstoned interface {
cacheTombstone()
}

// IsTombstone checks if resource is represented by the cacheTombstone.
func isCacheTombstone(res resource.Resource) bool {
_, ok := res.(cacheTombstoned)

return ok
}
3 changes: 3 additions & 0 deletions pkg/controller/runtime/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func TestCacheOperations(t *testing.T) {
})

assert.Equal(t, "1000", metrics.CachedResources.Get("A").String())
assert.Equal(t, "10000", metrics.CachedResources.Get("B").String()) // drops do not cause the cache to be cleared due to the tombstone

c.ClearTombstones("b", "B")
assert.Equal(t, "5000", metrics.CachedResources.Get("B").String())
}

Expand Down
40 changes: 25 additions & 15 deletions pkg/controller/runtime/internal/cache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ func (h *cacheHandler) get(ctx context.Context, id resource.ID, opts ...state.Ge
return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined))
}

res := h.resources[idx]
if isCacheTombstone(res) {
return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined))
}

// return a copy of the resource to satisfy State semantics
return h.resources[idx].DeepCopy(), nil
return res.DeepCopy(), nil
}

func (h *cacheHandler) contextWithTeardown(ctx context.Context, id resource.ID) (context.Context, error) {
Expand Down Expand Up @@ -160,6 +165,10 @@ func (h *cacheHandler) list(ctx context.Context, opts ...state.ListOption) (reso
resources := slices.Clone(h.resources)
h.mu.Unlock()

resources = xslices.Filter(resources, func(r resource.Resource) bool {
return !isCacheTombstone(r)
})

// micro optimization: apply filter only if some filters are specified
if !value.IsZero(options.IDQuery) || options.LabelQueries != nil {
resources = xslices.Filter(resources, func(r resource.Resource) bool {
Expand Down Expand Up @@ -190,6 +199,15 @@ func (h *cacheHandler) put(r resource.Resource) {
})

if found {
existing := h.resources[idx]
existingVersion := existing.Metadata().Version()
newVersion := r.Metadata().Version()

stale := !isCacheTombstone(existing) && newVersion.Value() < existingVersion.Value()
if stale {
return
}

h.resources[idx] = r
} else {
h.resources = slices.Insert(h.resources, idx, r)
Expand All @@ -205,24 +223,16 @@ func (h *cacheHandler) put(r resource.Resource) {
}
}

func (h *cacheHandler) remove(r resource.Resource) {
func (h *cacheHandler) clearTombstones() {
h.mu.Lock()
defer h.mu.Unlock()

idx, found := slices.BinarySearchFunc(h.resources, r.Metadata().ID(), func(r resource.Resource, id resource.ID) int {
return cmp.Compare(r.Metadata().ID(), id)
})

if found {
h.resources = slices.Delete(h.resources, idx, idx+1)

metrics.CachedResources.Add(r.Metadata().Type(), -1)
}
before := len(h.resources)
h.resources = slices.DeleteFunc(h.resources, isCacheTombstone)
after := len(h.resources)
delta := after - before

if ch, ok := h.teardownWaiters[r.Metadata().ID()]; ok {
close(ch)
delete(h.teardownWaiters, r.Metadata().ID())
}
metrics.CachedResources.Add(h.key.Type, int64(delta))
}

func (h *cacheHandler) len() int {
Expand Down
56 changes: 53 additions & 3 deletions pkg/controller/runtime/internal/cache/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cache

import (
"context"
"sync"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
Expand All @@ -20,6 +21,8 @@ import (
type stateWrapper struct {
cache *ResourceCache
st state.CoreState

lock sync.Mutex
}

// Check interfaces.
Expand Down Expand Up @@ -49,7 +52,19 @@ func (wrapper *stateWrapper) List(ctx context.Context, r resource.Kind, opts ...
//
// If a resource already exists, Create returns an error.
func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, opts ...state.CreateOption) error {
return wrapper.st.Create(ctx, r, opts...)
wrapper.lock.Lock()
defer wrapper.lock.Unlock()

err := wrapper.st.Create(ctx, r, opts...)
if err != nil {
return err
}

if wrapper.cache.IsHandled(r.Metadata().Namespace(), r.Metadata().Type()) {
wrapper.cache.CachePut(r)
}

return nil
}

// Update a resource.
Expand All @@ -58,15 +73,50 @@ func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, op
// On update current version of resource `new` in the state should match
// the version on the backend, otherwise conflict error is returned.
func (wrapper *stateWrapper) Update(ctx context.Context, newResource resource.Resource, opts ...state.UpdateOption) error {
return wrapper.st.Update(ctx, newResource, opts...)
wrapper.lock.Lock()
defer wrapper.lock.Unlock()

if err := wrapper.st.Update(ctx, newResource, opts...); err != nil {
return err
}

if wrapper.cache.IsHandled(newResource.Metadata().Namespace(), newResource.Metadata().Type()) {
wrapper.cache.CachePut(newResource)
}

return nil
}

// Destroy a resource.
//
// If a resource doesn't exist, error is returned.
// If a resource has pending finalizers, error is returned.
func (wrapper *stateWrapper) Destroy(ctx context.Context, ptr resource.Pointer, opts ...state.DestroyOption) error {
return wrapper.st.Destroy(ctx, ptr, opts...)
wrapper.lock.Lock()
defer wrapper.lock.Unlock()

var cached resource.Resource

if wrapper.cache.IsHandled(ptr.Namespace(), ptr.Type()) {
var err error
if cached, err = wrapper.cache.Get(ctx, ptr); err != nil {
return err
}
}

if err := wrapper.st.Destroy(ctx, ptr, opts...); err != nil {
if cached != nil && state.IsNotFoundError(err) {
wrapper.cache.CacheRemoveByPointer(cached.Metadata())
}

return err
}

if cached != nil {
wrapper.cache.CacheRemoveByPointer(cached.Metadata())
}

return nil
}

// Watch state of a resource by type.
Expand Down
Loading