Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 67 additions & 66 deletions benchmark/benchmark_informer_supplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,6 @@ import (
"github.com/grafana/grafana-app-sdk/simple"
)

// runInformerWithValidation runs an informer and validates that all expected events are received.
// It sets up event handlers, runs the informer, waits for sync, validates event delivery,
// and handles shutdown with the specified timeout.
func runInformerWithValidation(b *testing.B, inf operator.Informer, expectedCount int) {
// Track received events to verify event delivery
var (
receivedCount atomic.Int32
doneCh = make(chan struct{})
)

if err := inf.AddEventHandler(&operator.SimpleWatcher{
AddFunc: func(_ context.Context, obj resource.Object) error {
if int(receivedCount.Add(1)) == expectedCount {
close(doneCh)
}

return nil
},
}); err != nil {
b.Fatalf("failed to add event handler: %v", err)
}

// Create context with cancellation to be able to stop the informer
ctx, cancel := context.WithCancel(b.Context())

// Run informer in background
errCh := make(chan error, 1)
go func() {
errCh <- inf.Run(ctx)
}()

// Wait for the informer to sync fully.
if err := inf.WaitForSync(ctx); err != nil {
cancel()
b.Fatalf("failed to wait for informer to sync: %v", err)
}

// Wait for all events to be processed
select {
case <-doneCh:
// All events received
case <-b.Context().Done():
cancel()
b.Fatalf("timeout waiting for events, got %d of %d", receivedCount.Load(), expectedCount)
}

// Verify all objects were received via event handler
actualCount := int(receivedCount.Load())
if actualCount != expectedCount {
cancel()
b.Fatalf("expected %d objects via event handler, got %d", expectedCount, actualCount)
}

// Stop informer
cancel()

// Wait for shutdown
select {
case <-errCh:
// Clean shutdown
case <-b.Context().Done():
// Shutdown timeout
b.Fatalf("timeout waiting for informer to stop")
}
}

// BenchmarkDefaultInformerSupplier benchmarks the standard K8s informer supplier.
func BenchmarkDefaultInformerSupplier(b *testing.B) {
if err := suppressLogger(); err != nil {
Expand Down Expand Up @@ -176,6 +110,7 @@ func BenchmarkOptimizedInformerSupplier(b *testing.B) {
ListWatchOptions: operator.ListWatchOptions{},
WatchListPageSize: scenario.watchListPageSize,
UseWatchList: scenario.useWatchList,
UseRealFIFO: true,
}

for b.Loop() {
Expand All @@ -191,3 +126,69 @@ func BenchmarkOptimizedInformerSupplier(b *testing.B) {
})
}
}

// runInformerWithValidation runs an informer and validates that all expected events are received.
// It sets up event handlers, runs the informer, waits for sync, validates event delivery,
// and handles shutdown with the specified timeout.
func runInformerWithValidation(b *testing.B, inf operator.Informer, expectedCount int) {
// Track received events to verify event delivery
var (
receivedCount atomic.Int32
doneCh = make(chan struct{})
)

if err := inf.AddEventHandler(&operator.SimpleWatcher{
AddFunc: func(_ context.Context, obj resource.Object) error {
if int(receivedCount.Add(1)) == expectedCount {
close(doneCh)
}

return nil
},
}); err != nil {
b.Fatalf("failed to add event handler: %v", err)
}

// Create context with cancellation to be able to stop the informer
ctx, cancel := context.WithCancel(b.Context())

// Run informer in background
errCh := make(chan error, 1)
go func() {
errCh <- inf.Run(ctx)
}()

// Wait for the informer to sync fully.
if err := inf.WaitForSync(ctx); err != nil {
cancel()
b.Fatalf("failed to wait for informer to sync: %v", err)
}

// Wait for all events to be processed
select {
case <-doneCh:
// All events received
case <-b.Context().Done():
cancel()
b.Fatalf("timeout waiting for events, got %d of %d", receivedCount.Load(), expectedCount)
}

// Verify all objects were received via event handler
actualCount := int(receivedCount.Load())
if actualCount != expectedCount {
cancel()
b.Fatalf("expected %d objects via event handler, got %d", expectedCount, actualCount)
}

// Stop informer
cancel()

// Wait for shutdown
select {
case <-errCh:
// Clean shutdown
case <-b.Context().Done():
// Shutdown timeout
b.Fatalf("timeout waiting for informer to stop")
}
}
75 changes: 38 additions & 37 deletions benchmark/benchmark_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package benchmark_test

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -53,7 +52,7 @@ func newMockRestClient(kind resource.Kind, objects []resource.Object) (rest.Inte
Group: kind.Schema.Group(),
Version: kind.Schema.Version(),
},
NegotiatedSerializer: &k8s.GenericNegotiatedSerializer{},
NegotiatedSerializer: &k8s.KindNegotiatedSerializer{Kind: kind},
},
Transport: transport,
}
Expand Down Expand Up @@ -114,7 +113,10 @@ func (m *mockRoundTripper) handleList(req *http.Request) (*http.Response, error)
}

endIdx := len(m.objects)
var continueToken string
var (
continueToken string
remainingItemCount int64
)

// Handle pagination with limit
if limitStr := query.Get("limit"); limitStr != "" {
Expand All @@ -133,47 +135,46 @@ func (m *mockRoundTripper) handleList(req *http.Request) (*http.Response, error)
if endIdx < len(m.objects) {
continueToken = strconv.Itoa(endIdx)
}
}

// Get codec for marshaling objects
codec := m.kind.Codecs[resource.KindEncodingJSON]

// Marshal objects to JSON using the real codec
items := make([]json.RawMessage, 0, endIdx-startIdx)
for i := startIdx; i < endIdx; i++ {
buf := getBuffer()
if err := codec.Write(buf, m.objects[i]); err != nil {
putBuffer(buf)
return &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`{"error":"marshal error: %v"}`, err)))),
Header: make(http.Header),
}, nil
}
// Copy buffer contents before returning to pool
itemData := make([]byte, buf.Len())
copy(itemData, buf.Bytes())
items = append(items, json.RawMessage(itemData))
putBuffer(buf)
remainingItemCount = int64(len(m.objects) - endIdx)
}

// Create Kubernetes List response format
listResp := map[string]interface{}{
"apiVersion": m.kind.Schema.Group() + "/" + m.kind.Schema.Version(),
"kind": m.kind.Schema.Kind() + "List",
"metadata": map[string]interface{}{
"resourceVersion": "1000",
listResp := &resource.TypedList[resource.Object]{
TypeMeta: metav1.TypeMeta{
APIVersion: schema.GroupVersion{
Group: m.kind.Schema.Group(),
Version: m.kind.Schema.Version(),
}.Identifier(),
Kind: m.kind.Schema.ZeroListValue().GetObjectKind().GroupVersionKind().Kind,
},
"items": items,
ListMeta: metav1.ListMeta{
ResourceVersion: "1000",
},
Items: m.objects[startIdx:endIdx],
}

if continueToken != "" {
listResp["metadata"].(map[string]interface{})["continue"] = continueToken
listResp.Continue = continueToken
}
if remainingItemCount > 0 {
listResp.RemainingItemCount = &remainingItemCount
}

// Marshal the full response
body, err := json.Marshal(listResp)
if err != nil {
buf := getBuffer()
defer putBuffer(buf)

// Get codec for marshaling objects
codec := m.kind.Codec(resource.KindEncodingJSON)
if codec == nil {
return &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(bytes.NewReader([]byte(`{"error":"no codec for KindEncodingJSON"}`))),
Header: make(http.Header),
}, nil
}

// Marshal the response to JSON.
if err := codec.WriteList(buf, listResp); err != nil {
return &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`{"error":"response marshal error: %v"}`, err)))),
Expand All @@ -182,11 +183,11 @@ func (m *mockRoundTripper) handleList(req *http.Request) (*http.Response, error)
}

headers := make(http.Header)
headers.Set("Content-Type", "application/json")
headers.Set("Content-Type", string(resource.KindEncodingJSON))

return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader(body)),
Body: io.NopCloser(buf),
Header: headers,
}, nil
}
Expand Down
22 changes: 15 additions & 7 deletions k8s/negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,15 @@ func (*GenericJSONDecoder) Decode(
}

return into, defaults, nil
case chk.Items != nil: // TODO: the codecs don't know how to handle lists yet.
return nil, nil, fmt.Errorf("unsupported list object")
case chk.Items != nil: // Fallback to UntypedList
l := &UntypedListObjectWrapper{}
if err := json.Unmarshal(data, l); err != nil {
logging.DefaultLogger.Error("error unmarshalling into *k8s.UntypedListObjectWrapper", "error", err)
return into, defaults, fmt.Errorf("error unmarshalling into *k8s.UntypedListObjectWrapper: %w", err)
}

l.items = data
into = l
case chk.Kind != "": // Fallback to UntypedObject
o := &UntypedObjectWrapper{}
if err := json.Unmarshal(data, o); err != nil {
Expand Down Expand Up @@ -221,8 +228,7 @@ func (c *CodecDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, in
return cast, defaults, err
case resource.ListObject:
logging.DefaultLogger.Debug("decoding object into provided resource.ListObject", "gvk", into.GetObjectKind().GroupVersionKind().String())
// TODO: use codec for each element in the list?
err := c.Decoder(data, cast)
err := c.Codec.ReadList(bytes.NewReader(data), cast)
return cast, defaults, err
case *metav1.WatchEvent:
logging.DefaultLogger.Debug("decoding object into provided *v1.WatchEvent", "gvk", into.GetObjectKind().GroupVersionKind().String())
Expand Down Expand Up @@ -275,8 +281,7 @@ func (c *CodecDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, in
logging.DefaultLogger.Warn("no SampleObject set in CodecDecoder, using *resource.TypedList[*resource.UntypedObject]")
obj = &resource.TypedList[*resource.UntypedObject]{}
}
// TODO: use codec for each element in the list?
err = c.Decoder(data, &obj)
err = c.Codec.ReadList(bytes.NewReader(data), obj)
return obj, defaults, err
}

Expand All @@ -298,7 +303,10 @@ func (c *CodecDecoder) Encode(obj runtime.Object, w io.Writer) error {
if cast, ok := obj.(resource.Object); ok {
return c.Codec.Write(w, cast)
}
return errors.New("provided object is not a resource.Object")
if cast, ok := obj.(resource.ListObject); ok {
return c.Codec.WriteList(w, cast)
}
return errors.New("provided object is not a resource.Object or resource.ListObject")
}

// Identifier returns "generic-json-decoder"
Expand Down
22 changes: 22 additions & 0 deletions k8s/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ func (o *UntypedObjectWrapper) Into(into resource.Object, codec resource.Codec)
return codec.Read(bytes.NewReader(o.object), into)
}

// UntypedListObjectWrapper wraps a list of UntypedObjectWrappers, and implements runtime.Object
type UntypedListObjectWrapper struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
items json.RawMessage
}

// DeepCopyObject copies the object
func (o *UntypedListObjectWrapper) DeepCopyObject() runtime.Object {
val := reflect.ValueOf(o).Elem()

cpy := reflect.New(val.Type())
cpy.Elem().Set(val)

// Using the <obj>, <ok> for the type conversion ensures that it doesn't panic if it can't be converted
if obj, ok := cpy.Interface().(runtime.Object); ok {
return obj
}

return nil
}

// UntypedWatchObject implements runtime.Object, and keeps the Object part of a kubernetes watch event as bytes
// when unmarshaled, so that it can later be marshaled into a concrete type with Into().
type UntypedWatchObject struct {
Expand Down
9 changes: 7 additions & 2 deletions operator/concurrentwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import (
)

var (
initialBufferSize = 1024
// initialBufferSize is the initial capacity for bufferedQueue ring buffers.
// Reduced from 1024 to 64 based on benchmark analysis showing typical queue
// depth stays <20 events. The RingGrowing buffer dynamically expands during
// burst scenarios, providing safety while optimizing for common case.
// Memory savings: ~1.2 GB (17% reduction) in high-concurrency scenarios.
initialBufferSize = 64
)

type eventInfo struct {
Expand Down Expand Up @@ -56,7 +61,7 @@ func newConcurrentWatcher(
cw := &concurrentWatcher{
watcher: watcher,
size: initialPoolSize,
workers: make(map[uint64]*bufferedQueue, initialBufferSize),
workers: make(map[uint64]*bufferedQueue, initialPoolSize),
errorHandler: DefaultErrorHandler,
}
if errorHandler != nil {
Expand Down
Loading
Loading