From c1326e05e92b83c257c94e30dee7fbfc5dda5fc1 Mon Sep 17 00:00:00 2001 From: Rajeev Baliyan <20521486+rbaliyan@users.noreply.github.com> Date: Fri, 8 May 2026 15:09:40 +0530 Subject: [PATCH 1/3] refactor(k8s): decouple store from kubernetes.io dependencies Replace the direct dependency on k8s.io/* with a narrow Client interface (Get/Upsert/Watch/Health) and remove the informer-based local cache. Reads go straight through to the Client and rely on the Manager-level cache for repeats, matching how the postgres and mongodb stores behave. The k8s package now lives in the main config module so a single release covers it. A reference Adapter wrapping kubernetes.Interface lives at k8s/example/ in its own go.mod, kept on the kubernetes client release cadence rather than the config one. --- CLAUDE.md | 29 +- k8s/client.go | 110 +++++++ k8s/example/README.md | 58 ++++ k8s/example/adapter.go | 281 +++++++++++++++++ k8s/{ => example}/go.mod | 18 +- k8s/{ => example}/go.sum | 30 +- k8s/example/main/main.go | 58 ++++ k8s/helpers_test.go | 49 ++- k8s/options.go | 35 +-- k8s/store.go | 661 +++++++++++++-------------------------- k8s/store_test.go | 310 ++++++++++++++++++ k8s/watch.go | 3 +- 12 files changed, 1131 insertions(+), 511 deletions(-) create mode 100644 k8s/client.go create mode 100644 k8s/example/README.md create mode 100644 k8s/example/adapter.go rename k8s/{ => example}/go.mod (80%) rename k8s/{ => example}/go.sum (91%) create mode 100644 k8s/example/main/main.go create mode 100644 k8s/store_test.go diff --git a/CLAUDE.md b/CLAUDE.md index 0164244..28db205 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -70,11 +70,33 @@ config/ │ ├── store.go # Fallback, ReadThrough, WriteThrough strategies │ └── options.go │ -└── live/ # Live config binding - ├── ref.go # Atomic live reference (Ref[T]) - └── binding.go # Mutex-based live binding +├── live/ # Live config binding +│ ├── ref.go # Atomic live reference (Ref[T]) +│ └── binding.go # Mutex-based live binding +│ +└── k8s/ # Kubernetes ConfigMap/Secret store (no k8s.io deps) + ├── client.go # Client interface, Resource, Event, ErrNotFound + ├── store.go # Store implementation built on Client + ├── options.go # Store options + ├── watch.go # Watch fan-out + └── example/ # Reference adapter (separate go.mod, depends on k8s.io/*) + ├── adapter.go # kubernetes.Interface → Client adapter + └── main/ # Runnable demo ``` +### k8s store + +Unlike the postgres or mongodb stores, the k8s store does NOT import `k8s.io/*`. +Instead it depends on a narrow `k8s.Client` interface (Get/Upsert/Watch/Health) +and the caller supplies an adapter — see `k8s/example/` for a reference +implementation built on `kubernetes.Interface`. This keeps the heavy kubernetes +transitive dependency out of `github.com/rbaliyan/config` and lets the store +ride the same release cadence as the rest of the module. + +The store does not maintain a local resource cache: reads call straight through +to the Client and Manager-level caching handles repeats, mirroring postgres / +mongodb behavior. + ## Key Design Decisions ### Value vs Entry @@ -331,6 +353,7 @@ Optional (for specific backends): ## Recent Changes +- **k8s store decoupled from k8s.io/***: The `k8s` package now depends only on a `Client` interface (Get/Upsert/Watch/Health) instead of `kubernetes.Interface`. The previous `k8s/go.mod` is gone — k8s ships in the main module. A reference adapter using `kubernetes.Interface` lives at `k8s/example/` with its own `go.mod`. The store no longer maintains an informer cache; reads call straight through to the Client. - **Removed deprecated `live.Binding`**: Use `live.Ref[T]` instead for lock-free atomic reads. `DefaultPollInterval` and `ErrInvalidTarget` remain in the `live` package. - **Unexported `otel.Metrics`**: The metrics struct and fields are now unexported (`metrics`, `operationCount`, `errorCount`, `operationLatency`) - **Benchmarks**: `benchmark_test.go` provides benchmarks for Manager.Get, Value operations, MarkStale, and Store operations diff --git a/k8s/client.go b/k8s/client.go new file mode 100644 index 0000000..19c1b0c --- /dev/null +++ b/k8s/client.go @@ -0,0 +1,110 @@ +package k8s + +import ( + "context" + "errors" +) + +// Kind identifies which Kubernetes resource type a Resource represents. +// Store routes ConfigMap reads/writes for non-secret keys and Secret reads/writes +// for keys matching the configured secret prefix. +type Kind int + +const ( + // KindConfigMap represents a Kubernetes ConfigMap. + KindConfigMap Kind = iota + // KindSecret represents a Kubernetes Secret. + KindSecret +) + +// String returns the human-readable name of the kind. +func (k Kind) String() string { + switch k { + case KindConfigMap: + return "configmap" + case KindSecret: + return "secret" + default: + return "unknown" + } +} + +// Resource is the kubernetes-agnostic projection of a ConfigMap or Secret used +// by Store. ConfigMap string data and Secret byte data are both represented as +// []byte here; adapters convert between this form and the wire types. +type Resource struct { + // Name is the resource name (e.g., "config-prod" or "config-secrets-prod"). + Name string + // ResourceVersion is the Kubernetes ResourceVersion, used as the value version. + ResourceVersion string + // Annotations carries codec-name annotations populated by the Store. + // Adapters must round-trip annotations exactly. + Annotations map[string]string + // Data is the resource payload keyed by Kubernetes data key (slash converted to dot). + Data map[string][]byte +} + +// EventType describes the type of change observed in a Watch stream. +type EventType int + +const ( + // EventAdd is emitted when a watched resource is created or first observed. + EventAdd EventType = iota + // EventUpdate is emitted when a watched resource is modified. + EventUpdate + // EventDelete is emitted when a watched resource is removed. + EventDelete +) + +// Event is a single change notification produced by Client.Watch. +type Event struct { + // Type is the kind of change. + Type EventType + // Kind identifies whether the change applies to a ConfigMap or a Secret. + Kind Kind + // Namespace is the Kubernetes namespace of the resource. + Namespace string + // Old is the previous resource state. Set on EventUpdate and EventDelete. + Old *Resource + // New is the new resource state. Set on EventAdd and EventUpdate. + New *Resource +} + +// ErrNotFound is returned by Client.Get when the requested resource does not +// exist. Adapters must translate Kubernetes "not found" API errors to this +// sentinel so the Store can surface config.ErrNotFound to callers. +var ErrNotFound = errors.New("k8s resource not found") + +// Client is the kubernetes-facing surface required by Store. It is intentionally +// narrow so that adapters only need to translate a handful of operations and +// the main config module does not depend on k8s.io/* packages. +// +// Implementations must be safe for concurrent use by multiple goroutines. +// +// See k8s/example for a reference adapter built on top of kubernetes.Interface. +type Client interface { + // Get fetches a resource by namespace and name. Implementations must return + // ErrNotFound when the resource does not exist. Other errors are wrapped + // and surfaced verbatim by the Store. + Get(ctx context.Context, kind Kind, namespace, name string) (*Resource, error) + + // Upsert creates the resource if it does not exist, or updates it otherwise. + // The returned Resource carries the post-write ResourceVersion. + Upsert(ctx context.Context, kind Kind, namespace string, r *Resource) (*Resource, error) + + // Watch starts a watch over both ConfigMaps and Secrets in namespace + // (or all namespaces when namespace is ""). The returned channel must: + // + // - Emit one Event per observed add/update/delete. + // - Be closed when ctx is cancelled or the underlying watch terminates. + // - Reconnect transparently on transient errors. + // + // Adapters typically filter to resources whose names begin with the Store's + // managed prefixes ("config-" and "config-secrets-") to avoid noisy events; + // the Store also filters events on its side. + Watch(ctx context.Context, namespace string) (<-chan Event, error) + + // Health performs a lightweight liveness check (e.g., listing namespaces). + // Used by Store.Health. + Health(ctx context.Context) error +} diff --git a/k8s/example/README.md b/k8s/example/README.md new file mode 100644 index 0000000..9ce3a83 --- /dev/null +++ b/k8s/example/README.md @@ -0,0 +1,58 @@ +# Kubernetes adapter example + +This module is a reference implementation of the +[`k8s.Client`](../client.go) interface defined by `github.com/rbaliyan/config/k8s`. +It is kept in a separate Go module so the main `config` module never depends on +`k8s.io/*` packages — config and the k8s store can be released on a single +cadence, while this adapter follows the kubernetes client release cycle. + +## What the adapter provides + +| `k8s.Client` method | Implementation | +|---|---| +| `Get` | `CoreV1().{ConfigMaps,Secrets}.Get` | +| `Upsert` | Create-or-Update on conflict (`IsAlreadyExists` → Update) | +| `Watch` | `CoreV1().{ConfigMaps,Secrets}.Watch` merged into one channel | +| `Health` | `CoreV1().Namespaces.List(Limit: 1)` | + +There is **no informer cache** — reads go straight to the API server and the +config Manager handles repeat-read caching. This mirrors how the postgres and +mongodb stores behave. + +## Wiring it up + +```go +import ( + "github.com/rbaliyan/config/k8s" + kubeclient "github.com/rbaliyan/config/k8s/example" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +cfg, _ := rest.InClusterConfig() // or clientcmd.BuildConfigFromFlags(...) +cs, _ := kubernetes.NewForConfig(cfg) + +store := k8s.NewStore(kubeclient.New(cs), + k8s.WithK8sNamespace("config-system"), // pin to one k8s namespace + k8s.WithSecretKeyPrefix("secret/"), // route secret/* keys to k8s Secrets +) + +if err := store.Connect(ctx); err != nil { ... } +defer store.Close(ctx) +``` + +See [`main/main.go`](./main/main.go) for a runnable end-to-end demo. + +## Notes for production use + +- **Reconnect on watch loss.** The adapter does not automatically resume a + dropped watch. Wrap the upstream `watch.Interface` with + [`watch.NewRetryWatcher`](https://pkg.go.dev/k8s.io/client-go/tools/watch#NewRetryWatcher) + if you need that behavior; the rest of the code remains unchanged. +- **RBAC.** The service account needs `get`, `list`, `watch`, `create`, `update` + on `configmaps` and `secrets` in the target namespace, plus `list` on + `namespaces` for the health check. +- **Annotations.** The store records the codec for each key in an annotation + (`config.rbaliyan.dev/codec-`). The adapter must round-trip annotations + exactly; this implementation does so via the standard ConfigMap/Secret + metadata. diff --git a/k8s/example/adapter.go b/k8s/example/adapter.go new file mode 100644 index 0000000..890e000 --- /dev/null +++ b/k8s/example/adapter.go @@ -0,0 +1,281 @@ +// Package kubeclient is a reference adapter that satisfies the [k8s.Client] +// interface defined by github.com/rbaliyan/config/k8s using the official +// Kubernetes Go client. +// +// The adapter intentionally lives in its own Go module so that the main config +// module does not pull in k8s.io/* dependencies. Copy this file into your own +// project (or import this module directly) to wire a [k8s.Store] up to a real +// cluster. +// +// Usage: +// +// cfg, err := rest.InClusterConfig() // or BuildConfigFromFlags(...) +// if err != nil { ... } +// cs, err := kubernetes.NewForConfig(cfg) +// if err != nil { ... } +// +// client := kubeclient.New(cs) +// store := k8s.NewStore(client, k8s.WithK8sNamespace("config-system")) +// if err := store.Connect(ctx); err != nil { ... } +// defer store.Close(ctx) +package kubeclient + +import ( + "context" + "fmt" + + "github.com/rbaliyan/config/k8s" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +) + +// Adapter implements [k8s.Client] over kubernetes.Interface. Reads and writes +// go straight to the API server (the Store does not maintain a local cache); +// Watch wraps Kubernetes watch.Interface streams into a single Event channel. +type Adapter struct { + cs kubernetes.Interface +} + +// New constructs an Adapter wrapping the given Kubernetes clientset. +func New(cs kubernetes.Interface) *Adapter { + return &Adapter{cs: cs} +} + +// Get fetches a ConfigMap or Secret by namespace and name. +func (a *Adapter) Get(ctx context.Context, kind k8s.Kind, namespace, name string) (*k8s.Resource, error) { + switch kind { + case k8s.KindConfigMap: + cm, err := a.cs.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, k8s.ErrNotFound + } + return nil, err + } + return cmToResource(cm), nil + case k8s.KindSecret: + sec, err := a.cs.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, k8s.ErrNotFound + } + return nil, err + } + return secretToResource(sec), nil + default: + return nil, fmt.Errorf("kubeclient: unknown kind %v", kind) + } +} + +// Upsert creates or updates a ConfigMap or Secret. +func (a *Adapter) Upsert(ctx context.Context, kind k8s.Kind, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + switch kind { + case k8s.KindConfigMap: + return a.upsertConfigMap(ctx, namespace, r) + case k8s.KindSecret: + return a.upsertSecret(ctx, namespace, r) + default: + return nil, fmt.Errorf("kubeclient: unknown kind %v", kind) + } +} + +func (a *Adapter) upsertConfigMap(ctx context.Context, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.Name, + Namespace: namespace, + Annotations: r.Annotations, + }, + Data: bytesToStringMap(r.Data), + } + created, err := a.cs.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{}) + if err == nil { + return cmToResource(created), nil + } + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + updated, err := a.cs.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + return cmToResource(updated), nil +} + +func (a *Adapter) upsertSecret(ctx context.Context, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + sec := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.Name, + Namespace: namespace, + Annotations: r.Annotations, + }, + Data: r.Data, + } + created, err := a.cs.CoreV1().Secrets(namespace).Create(ctx, sec, metav1.CreateOptions{}) + if err == nil { + return secretToResource(created), nil + } + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + updated, err := a.cs.CoreV1().Secrets(namespace).Update(ctx, sec, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + return secretToResource(updated), nil +} + +// Watch starts watching ConfigMaps and Secrets in namespace and forwards +// add/update/delete events on a single channel. The channel is closed when ctx +// is cancelled or when both upstream watches end. Transient watch errors are +// ignored — re-establishing the watch is the caller's responsibility (or wire +// in client-go's watch.NewRetryWatcher for automatic recovery). +func (a *Adapter) Watch(ctx context.Context, namespace string) (<-chan k8s.Event, error) { + cmW, err := a.cs.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + secW, err := a.cs.CoreV1().Secrets(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + cmW.Stop() + return nil, err + } + + out := make(chan k8s.Event, 32) + go func() { + defer close(out) + defer cmW.Stop() + defer secW.Stop() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-cmW.ResultChan(): + if !ok { + return + } + if e, kind, fwd := translateConfigMapEvent(ev); fwd { + send(ctx, out, k8s.Event{Type: e, Kind: kind, Namespace: namespace, Old: nil, New: cmObjToResource(ev.Object)}) + } + case ev, ok := <-secW.ResultChan(): + if !ok { + return + } + if e, kind, fwd := translateSecretEvent(ev); fwd { + send(ctx, out, k8s.Event{Type: e, Kind: kind, Namespace: namespace, Old: nil, New: secObjToResource(ev.Object)}) + } + } + } + }() + return out, nil +} + +// Health performs a lightweight liveness check by listing namespaces. +func (a *Adapter) Health(ctx context.Context) error { + _, err := a.cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{Limit: 1}) + return err +} + +// --------------------------------------------------------------------------- +// Conversion helpers +// --------------------------------------------------------------------------- + +func cmToResource(cm *corev1.ConfigMap) *k8s.Resource { + if cm == nil { + return nil + } + return &k8s.Resource{ + Name: cm.Name, + ResourceVersion: cm.ResourceVersion, + Annotations: cm.Annotations, + Data: stringToBytesMap(cm.Data), + } +} + +func secretToResource(sec *corev1.Secret) *k8s.Resource { + if sec == nil { + return nil + } + return &k8s.Resource{ + Name: sec.Name, + ResourceVersion: sec.ResourceVersion, + Annotations: sec.Annotations, + Data: sec.Data, + } +} + +func cmObjToResource(obj any) *k8s.Resource { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil + } + return cmToResource(cm) +} + +func secObjToResource(obj any) *k8s.Resource { + sec, ok := obj.(*corev1.Secret) + if !ok { + return nil + } + return secretToResource(sec) +} + +func translateConfigMapEvent(ev watch.Event) (k8s.EventType, k8s.Kind, bool) { + switch ev.Type { + case watch.Added: + return k8s.EventAdd, k8s.KindConfigMap, true + case watch.Modified: + return k8s.EventUpdate, k8s.KindConfigMap, true + case watch.Deleted: + return k8s.EventDelete, k8s.KindConfigMap, true + default: + return 0, k8s.KindConfigMap, false + } +} + +func translateSecretEvent(ev watch.Event) (k8s.EventType, k8s.Kind, bool) { + switch ev.Type { + case watch.Added: + return k8s.EventAdd, k8s.KindSecret, true + case watch.Modified: + return k8s.EventUpdate, k8s.KindSecret, true + case watch.Deleted: + return k8s.EventDelete, k8s.KindSecret, true + default: + return 0, k8s.KindSecret, false + } +} + +func send(ctx context.Context, out chan<- k8s.Event, ev k8s.Event) { + select { + case out <- ev: + case <-ctx.Done(): + } +} + +func stringToBytesMap(in map[string]string) map[string][]byte { + if in == nil { + return nil + } + out := make(map[string][]byte, len(in)) + for k, v := range in { + out[k] = []byte(v) + } + return out +} + +func bytesToStringMap(in map[string][]byte) map[string]string { + if in == nil { + return nil + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = string(v) + } + return out +} + +var _ k8s.Client = (*Adapter)(nil) diff --git a/k8s/go.mod b/k8s/example/go.mod similarity index 80% rename from k8s/go.mod rename to k8s/example/go.mod index 903fab7..0783bd2 100644 --- a/k8s/go.mod +++ b/k8s/example/go.mod @@ -1,9 +1,9 @@ -module github.com/rbaliyan/config/k8s +module github.com/rbaliyan/config/k8s/example go 1.26.3 require ( - github.com/rbaliyan/config v0.6.0 + github.com/rbaliyan/config v0.0.0 k8s.io/api v0.33.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 @@ -29,14 +29,16 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.38.0 // indirect + go.mongodb.org/mongo-driver/v2 v2.6.0 // indirect + golang.org/x/net v0.52.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sys v0.42.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.33.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/term v0.41.0 // indirect + golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.9.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + google.golang.org/protobuf v1.36.10 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -48,3 +50,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +replace github.com/rbaliyan/config => ../.. diff --git a/k8s/go.sum b/k8s/example/go.sum similarity index 91% rename from k8s/go.sum rename to k8s/example/go.sum index fb2c7e1..d393a3e 100644 --- a/k8s/go.sum +++ b/k8s/example/go.sum @@ -68,8 +68,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rbaliyan/config v0.6.0 h1:jISVZcp0QciCoKj6/FPicznfKL+fFs0iisRU4Q14rNo= -github.com/rbaliyan/config v0.6.0/go.mod h1:Nax4I1rmQL6l7UTyUCkd6AquuYjv6BrI8L0uoGCTf7I= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -89,8 +87,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8= +go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -100,8 +98,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -110,28 +108,28 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= -golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/k8s/example/main/main.go b/k8s/example/main/main.go new file mode 100644 index 0000000..affb9f6 --- /dev/null +++ b/k8s/example/main/main.go @@ -0,0 +1,58 @@ +// Command kubeclient-demo wires the example Adapter into a [k8s.Store] and +// performs a single set/get round trip. It is intended as documentation in +// runnable form rather than a production starter. +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/rbaliyan/config" + _ "github.com/rbaliyan/config/codec/json" + "github.com/rbaliyan/config/k8s" + kubeclient "github.com/rbaliyan/config/k8s/example" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func main() { + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, _ := os.UserHomeDir() + kubeconfig = filepath.Join(home, ".kube", "config") + } + + cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Fatalf("load kubeconfig: %v", err) + } + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Fatalf("build clientset: %v", err) + } + + store := k8s.NewStore(kubeclient.New(cs), k8s.WithK8sNamespace("default")) + ctx := context.Background() + if err := store.Connect(ctx); err != nil { + log.Fatalf("connect: %v", err) + } + defer func() { _ = store.Close(ctx) }() + + val := config.NewValue(map[string]any{"region": "us-east-1", "tier": "prod"}) + if _, err := store.Set(ctx, "demo", "app/settings", val); err != nil { + log.Fatalf("set: %v", err) + } + + got, err := store.Get(ctx, "demo", "app/settings") + if err != nil { + log.Fatalf("get: %v", err) + } + var out map[string]any + if err := got.Unmarshal(ctx, &out); err != nil { + log.Fatalf("unmarshal: %v", err) + } + fmt.Printf("settings: %+v\n", out) +} diff --git a/k8s/helpers_test.go b/k8s/helpers_test.go index 914abcb..efbc128 100644 --- a/k8s/helpers_test.go +++ b/k8s/helpers_test.go @@ -78,14 +78,20 @@ func TestParseResourceVersion(t *testing.T) { func TestStore_ConfigNamespace(t *testing.T) { s := &Store{} - if got := s.configNamespace("any", "config-prod"); got != "prod" { - t.Errorf("got %q", got) - } - if got := s.configNamespace("any", "config-secrets-prod"); got != "prod" { - t.Errorf("got %q", got) + cases := []struct { + ev Event + want string + }{ + {Event{New: &Resource{Name: "config-prod"}}, "prod"}, + {Event{New: &Resource{Name: "config-secrets-prod"}}, "prod"}, + {Event{Old: &Resource{Name: "config-prod"}}, "prod"}, + {Event{New: &Resource{Name: "unrelated-cm"}}, ""}, + {Event{}, ""}, } - if got := s.configNamespace("any", "unrelated-cm"); got != "" { - t.Errorf("unmanaged name should return empty, got %q", got) + for _, c := range cases { + if got := s.configNamespace(c.ev); got != c.want { + t.Errorf("configNamespace(%+v) = %q, want %q", c.ev, got, c.want) + } } } @@ -106,6 +112,16 @@ func TestStore_K8sNamespace(t *testing.T) { } } +func TestStore_ResourceFor(t *testing.T) { + s := &Store{opts: storeOptions{secretPrefix: "secret/"}} + if kind, name := s.resourceFor("prod", "db/host"); kind != KindConfigMap || name != "config-prod" { + t.Errorf("expected configmap, got %v %q", kind, name) + } + if kind, name := s.resourceFor("prod", "secret/db-password"); kind != KindSecret || name != "config-secrets-prod" { + t.Errorf("expected secret, got %v %q", kind, name) + } +} + func TestDefaultOptions(t *testing.T) { o := defaultOptions() if o.secretPrefix != defaultSecretPrefix { @@ -114,9 +130,6 @@ func TestDefaultOptions(t *testing.T) { if o.watchBufSize != defaultWatchBufSize { t.Errorf("watchBufSize: %d", o.watchBufSize) } - if o.resyncPeriod != defaultResyncPeriod { - t.Errorf("resyncPeriod: %v", o.resyncPeriod) - } } func TestOptions(t *testing.T) { @@ -129,10 +142,6 @@ func TestOptions(t *testing.T) { if o.secretPrefix != "sec/" { t.Error("secretPrefix not set") } - WithResyncPeriod(0)(&o) - if o.resyncPeriod != defaultResyncPeriod { - t.Error("zero resync should be ignored") - } WithWatchBufferSize(-1)(&o) if o.watchBufSize != defaultWatchBufSize { t.Error("negative bufSize should be ignored") @@ -142,3 +151,15 @@ func TestOptions(t *testing.T) { t.Error("bufSize not updated") } } + +func TestKindString(t *testing.T) { + if KindConfigMap.String() != "configmap" { + t.Errorf("KindConfigMap: %q", KindConfigMap.String()) + } + if KindSecret.String() != "secret" { + t.Errorf("KindSecret: %q", KindSecret.String()) + } + if Kind(99).String() != "unknown" { + t.Errorf("unknown kind: %q", Kind(99).String()) + } +} diff --git a/k8s/options.go b/k8s/options.go index d23ac9e..ca7317f 100644 --- a/k8s/options.go +++ b/k8s/options.go @@ -1,26 +1,19 @@ package k8s -import ( - "time" -) - const ( - defaultSecretPrefix = "secret/" - defaultResyncPeriod = 10 * time.Minute - defaultWatchBufSize = 100 + defaultSecretPrefix = "secret/" + defaultWatchBufSize = 100 ) type storeOptions struct { - k8sNamespace string // k8s namespace to watch; "" = all namespaces - secretPrefix string // config keys with this prefix → Secret; default "secret/" - resyncPeriod time.Duration // informer resync; default 10m - watchBufSize int // default 100 + k8sNamespace string // k8s namespace to scope to; "" = all namespaces + secretPrefix string // config keys with this prefix → Secret; default "secret/" + watchBufSize int // per-subscriber buffer for Store.Watch channels; default 100 } func defaultOptions() storeOptions { return storeOptions{ secretPrefix: defaultSecretPrefix, - resyncPeriod: defaultResyncPeriod, watchBufSize: defaultWatchBufSize, } } @@ -29,7 +22,8 @@ func defaultOptions() storeOptions { type Option func(*storeOptions) // WithK8sNamespace restricts the store to a single Kubernetes namespace. -// When empty (the default), the store watches all namespaces. +// When empty (the default), the store operates across all namespaces and the +// config namespace name is used directly as the Kubernetes namespace. func WithK8sNamespace(ns string) Option { return func(o *storeOptions) { o.k8sNamespace = ns @@ -38,6 +32,7 @@ func WithK8sNamespace(ns string) Option { // WithSecretKeyPrefix sets the config key prefix that routes to Kubernetes Secrets. // Keys with this prefix are stored in/read from Secrets; all others use ConfigMaps. +// Set to "" to disable secret routing entirely. // Default is "secret/". func WithSecretKeyPrefix(prefix string) Option { return func(o *storeOptions) { @@ -45,18 +40,8 @@ func WithSecretKeyPrefix(prefix string) Option { } } -// WithResyncPeriod sets the informer resync period. -// Default is 10 minutes. -func WithResyncPeriod(d time.Duration) Option { - return func(o *storeOptions) { - if d > 0 { - o.resyncPeriod = d - } - } -} - -// WithWatchBufferSize sets the buffer size for watch event channels. -// Default is 100. +// WithWatchBufferSize sets the per-subscriber buffer size for Store.Watch +// channels. Default is 100. func WithWatchBufferSize(n int) Option { return func(o *storeOptions) { if n > 0 { diff --git a/k8s/store.go b/k8s/store.go index 4567791..c746162 100644 --- a/k8s/store.go +++ b/k8s/store.go @@ -1,22 +1,38 @@ // Package k8s provides a config.Store backed by Kubernetes ConfigMaps and Secrets. // -// Config namespaces map to Kubernetes namespaces (or to a fixed k8s namespace -// when WithK8sNamespace is set). Each config namespace is backed by one ConfigMap -// named "config-{namespace}" and, for secret-prefixed keys, one Secret named -// "config-secrets-{namespace}". +// Each config namespace is backed by one ConfigMap named "config-{namespace}" and, +// for keys matching the configured secret prefix, one Secret named +// "config-secrets-{namespace}". Config keys map to Kubernetes data keys by +// replacing "/" with "."; the codec used for each value is recorded in a +// per-resource annotation so reads can decode it. // -// Key mapping: config key "/" is replaced by "." to form a valid Kubernetes data key. -// Keys prefixed with opts.secretPrefix (default "secret/") are stored in Secrets; -// all other keys are stored in ConfigMaps. +// # Decoupled from kubernetes.io // -// Watch support is implemented via k8s.io/client-go informers. The informer cache -// is populated on Connect and kept in sync automatically; Get/Find read from the -// local cache without making API calls. +// This package does NOT import any k8s.io/* packages. Instead it depends on a +// narrow [Client] interface that the caller must implement, typically over the +// real Kubernetes Go client. See the k8s/example sub-module for a reference +// adapter built on top of kubernetes.Interface. +// +// # Operational model +// +// Like the postgres and mongodb stores, the k8s store does not maintain a local +// cache of resources. Reads (Get, Find) call straight through to the Client and +// rely on the Manager-level cache for repeat lookups. Watch is fed by +// Client.Watch, which the Store fans out to per-subscriber channels. +// +// # Connect / Close +// +// Connect starts a background goroutine that consumes events from Client.Watch +// and dispatches them to active watchers. The Store does not wait for any +// initial sync — the first Get after Connect performs an API round-trip via +// the Client. Close stops the watch goroutine and closes all subscriber +// channels. package k8s import ( + "bytes" "context" - "fmt" + "errors" "sort" "strconv" "strings" @@ -25,12 +41,6 @@ import ( "time" "github.com/rbaliyan/config" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" ) // configMapPrefix is the prefix for ConfigMap names managed by this store. @@ -39,16 +49,16 @@ const configMapPrefix = "config-" // secretNamePrefix is the prefix for Secret names managed by this store. const secretNamePrefix = "config-secrets-" -// codecAnnotationPrefix is used to round-trip codec names across writes/reads. -// Each k8sKey stored in a ConfigMap/Secret has a matching annotation -// "config.rbaliyan.dev/codec-" with its codec name. +// codecAnnotationPrefix prefixes the per-key codec annotation keys. +// Each data key stored in a ConfigMap/Secret has a matching annotation +// "config.rbaliyan.dev/codec-" naming the codec used to encode it. const codecAnnotationPrefix = "config.rbaliyan.dev/codec-" func codecAnnotationKey(k8sKey string) string { return codecAnnotationPrefix + k8sKey } -// resolveCodec returns the stored codec name for a given k8sKey or "json" as fallback. +// resolveCodec returns the stored codec name for a given k8sKey, or "json" as fallback. func resolveCodec(anns map[string]string, k8sKey string) string { if anns != nil { if c := anns[codecAnnotationKey(k8sKey)]; c != "" { @@ -58,14 +68,18 @@ func resolveCodec(anns map[string]string, k8sKey string) string { return "json" } -// Store implements config.Store backed by Kubernetes ConfigMaps and Secrets. +// Store implements config.Store backed by Kubernetes ConfigMaps and Secrets via +// a [Client] adapter. type Store struct { - client kubernetes.Interface - factory informers.SharedInformerFactory - opts storeOptions + client Client + opts storeOptions + + connected atomic.Bool + closed atomic.Bool - closed atomic.Bool - stopChan chan struct{} + watchCtx context.Context + watchCancel context.CancelFunc + watchDone chan struct{} watchMu sync.RWMutex watchers map[*watchEntry]struct{} @@ -76,9 +90,9 @@ var ( _ config.HealthChecker = (*Store)(nil) ) -// NewStore creates a new k8s-backed config store. +// NewStore creates a new k8s-backed config store using the given Client. // Call Connect before using the store. -func NewStore(client kubernetes.Interface, opts ...Option) *Store { +func NewStore(client Client, opts ...Option) *Store { o := defaultOptions() for _, opt := range opts { opt(&o) @@ -86,7 +100,6 @@ func NewStore(client kubernetes.Interface, opts ...Option) *Store { return &Store{ client: client, opts: o, - stopChan: make(chan struct{}), watchers: make(map[*watchEntry]struct{}), } } @@ -94,69 +107,40 @@ func NewStore(client kubernetes.Interface, opts ...Option) *Store { // BackendName returns the stable backend identifier used in error messages. func (s *Store) BackendName() string { return "k8s" } -// Connect starts the informer factory and waits for the initial cache sync. -// The context deadline/timeout controls how long to wait for cache sync. +// Connect starts the background watch loop. The first read after Connect +// hits the Kubernetes API via the Client; there is no initial cache sync. func (s *Store) Connect(ctx context.Context) error { if s.closed.Load() { return config.ErrStoreClosed } - - var factoryOpts []informers.SharedInformerOption - if s.opts.k8sNamespace != "" { - factoryOpts = append(factoryOpts, informers.WithNamespace(s.opts.k8sNamespace)) - } - - s.factory = informers.NewSharedInformerFactoryWithOptions( - s.client, - s.opts.resyncPeriod, - factoryOpts..., - ) - - cmInformer := s.factory.Core().V1().ConfigMaps().Informer() - secInformer := s.factory.Core().V1().Secrets().Informer() - - if _, err := cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { s.onConfigMapAdd(obj) }, - UpdateFunc: func(old, newObj any) { s.onConfigMapUpdate(old, newObj) }, - DeleteFunc: func(obj any) { s.onConfigMapDelete(obj) }, - }); err != nil { - return config.WrapStoreError("connect", "k8s", "configmap-handler", err) + if !s.connected.CompareAndSwap(false, true) { + return nil // already connected } - if _, err := secInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { s.onSecretAdd(obj) }, - UpdateFunc: func(old, newObj any) { s.onSecretUpdate(old, newObj) }, - DeleteFunc: func(obj any) { s.onSecretDelete(obj) }, - }); err != nil { - return config.WrapStoreError("connect", "k8s", "secret-handler", err) + events, err := s.client.Watch(context.Background(), s.opts.k8sNamespace) + if err != nil { + s.connected.Store(false) + return config.WrapStoreError("connect", "k8s", "watch", err) } - s.factory.Start(s.stopChan) + s.watchCtx, s.watchCancel = context.WithCancel(context.Background()) + s.watchDone = make(chan struct{}) - if !cache.WaitForCacheSync(ctx.Done(), cmInformer.HasSynced, secInformer.HasSynced) { - err := ctx.Err() - if err == nil { - err = fmt.Errorf("cache sync timed out") - } - return config.WrapStoreError("connect", "k8s", "cache-sync", err) - } + go s.runWatch(events) return nil } -// Close stops the informer factory and closes all watch channels. -// Close blocks until informer goroutines have drained so callers can -// safely release any resources they injected into the client. +// Close stops the watch goroutine and closes all subscriber channels. +// Close blocks until the watch goroutine has exited. func (s *Store) Close(_ context.Context) error { if s.closed.Swap(true) { return nil // already closed } - - close(s.stopChan) - - // Shutdown blocks until all informers have fully stopped. It is safe - // to call even if Start was never invoked. - if s.factory != nil { - s.factory.Shutdown() + if s.watchCancel != nil { + s.watchCancel() + } + if s.watchDone != nil { + <-s.watchDone } s.watchMu.Lock() @@ -181,12 +165,11 @@ func (s *Store) Close(_ context.Context) error { } // Get retrieves a configuration value by namespace and key. -// Reads from the informer cache (no API call). func (s *Store) Get(ctx context.Context, namespace, key string) (config.Value, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -198,47 +181,28 @@ func (s *Store) Get(ctx context.Context, namespace, key string) (config.Value, e k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - secName := secretResourceName(namespace) - lister := s.factory.Core().V1().Secrets().Lister() - secret, err := lister.Secrets(k8sNS).Get(secName) - if err != nil { - if apierrors.IsNotFound(err) { - return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - return nil, config.WrapStoreError("get", "k8s", key, err) - } - rawBytes, ok := secret.Data[k8sKey] - if !ok { - return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - return makeValueFromBytes(rawBytes, namespace, key, secret.ResourceVersion, resolveCodec(secret.Annotations, k8sKey)) - } - - cmName := configMapResourceName(namespace) - lister := s.factory.Core().V1().ConfigMaps().Lister() - cm, err := lister.ConfigMaps(k8sNS).Get(cmName) + r, err := s.client.Get(ctx, kind, k8sNS, name) if err != nil { - if apierrors.IsNotFound(err) { + if errors.Is(err, ErrNotFound) { return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} } return nil, config.WrapStoreError("get", "k8s", key, err) } - rawStr, ok := cm.Data[k8sKey] + raw, ok := r.Data[k8sKey] if !ok { return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} } - return makeValueFromString(rawStr, namespace, key, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) + return makeValueFromBytes(ctx, raw, namespace, key, r.ResourceVersion, resolveCodec(r.Annotations, k8sKey)) } // Set creates or updates a configuration value. -// Writes directly to the Kubernetes API (not via informer cache). func (s *Store) Set(ctx context.Context, namespace, key string, value config.Value) (config.Value, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -256,89 +220,19 @@ func (s *Store) Set(ctx context.Context, namespace, key string, value config.Val k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) codecName := value.Codec() + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - return s.setSecret(ctx, namespace, key, k8sNS, k8sKey, codecName, data) - } - return s.setConfigMap(ctx, namespace, key, k8sNS, k8sKey, codecName, string(data)) -} - -func (s *Store) setConfigMap(ctx context.Context, namespace, key, k8sNS, k8sKey, codecName, strVal string) (config.Value, error) { - cmName := configMapResourceName(namespace) - cm, err := s.client.CoreV1().ConfigMaps(k8sNS).Get(ctx, cmName, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { + existing, err := s.client.Get(ctx, kind, k8sNS, name) + if err != nil && !errors.Is(err, ErrNotFound) { return nil, config.WrapStoreError("set", "k8s", key, err) } - if err != nil { - // Create a new ConfigMap. - cm = &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: cmName, - Namespace: k8sNS, - Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, - }, - Data: map[string]string{k8sKey: strVal}, - } - created, createErr := s.client.CoreV1().ConfigMaps(k8sNS).Create(ctx, cm, metav1.CreateOptions{}) - if createErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, createErr) - } - return makeValueFromString(strVal, namespace, key, created.ResourceVersion, codecName) - } - - if cm.Data == nil { - cm.Data = make(map[string]string) - } - if cm.Annotations == nil { - cm.Annotations = make(map[string]string) - } - cm.Data[k8sKey] = strVal - cm.Annotations[codecAnnotationKey(k8sKey)] = codecName - updated, updateErr := s.client.CoreV1().ConfigMaps(k8sNS).Update(ctx, cm, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, updateErr) - } - return makeValueFromString(strVal, namespace, key, updated.ResourceVersion, codecName) -} - -func (s *Store) setSecret(ctx context.Context, namespace, key, k8sNS, k8sKey, codecName string, rawBytes []byte) (config.Value, error) { - secName := secretResourceName(namespace) - sec, err := s.client.CoreV1().Secrets(k8sNS).Get(ctx, secName, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return nil, config.WrapStoreError("set", "k8s", key, err) - } + r := mergeResource(existing, name, k8sKey, data, codecName) + updated, err := s.client.Upsert(ctx, kind, k8sNS, r) if err != nil { - // Create a new Secret. - sec = &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: secName, - Namespace: k8sNS, - Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, - }, - Data: map[string][]byte{k8sKey: rawBytes}, - } - created, createErr := s.client.CoreV1().Secrets(k8sNS).Create(ctx, sec, metav1.CreateOptions{}) - if createErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, createErr) - } - return makeValueFromBytes(rawBytes, namespace, key, created.ResourceVersion, codecName) - } - - if sec.Data == nil { - sec.Data = make(map[string][]byte) - } - if sec.Annotations == nil { - sec.Annotations = make(map[string]string) - } - sec.Data[k8sKey] = rawBytes - sec.Annotations[codecAnnotationKey(k8sKey)] = codecName - - updated, updateErr := s.client.CoreV1().Secrets(k8sNS).Update(ctx, sec, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, updateErr) + return nil, config.WrapStoreError("set", "k8s", key, err) } - return makeValueFromBytes(rawBytes, namespace, key, updated.ResourceVersion, codecName) + return makeValueFromBytes(ctx, data, namespace, key, updated.ResourceVersion, codecName) } // Delete removes a configuration entry. @@ -346,7 +240,7 @@ func (s *Store) Delete(ctx context.Context, namespace, key string) error { if s.closed.Load() { return config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -358,62 +252,35 @@ func (s *Store) Delete(ctx context.Context, namespace, key string) error { k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - return s.deleteFromSecret(ctx, namespace, key, k8sNS, k8sKey) - } - return s.deleteFromConfigMap(ctx, namespace, key, k8sNS, k8sKey) -} - -func (s *Store) deleteFromConfigMap(ctx context.Context, namespace, key, k8sNS, k8sKey string) error { - cmName := configMapResourceName(namespace) - cm, err := s.client.CoreV1().ConfigMaps(k8sNS).Get(ctx, cmName, metav1.GetOptions{}) + existing, err := s.client.Get(ctx, kind, k8sNS, name) if err != nil { - if apierrors.IsNotFound(err) { + if errors.Is(err, ErrNotFound) { return &config.KeyNotFoundError{Key: key, Namespace: namespace} } return config.WrapStoreError("delete", "k8s", key, err) } - if _, ok := cm.Data[k8sKey]; !ok { + if _, ok := existing.Data[k8sKey]; !ok { return &config.KeyNotFoundError{Key: key, Namespace: namespace} } - delete(cm.Data, k8sKey) - delete(cm.Annotations, codecAnnotationKey(k8sKey)) - _, updateErr := s.client.CoreV1().ConfigMaps(k8sNS).Update(ctx, cm, metav1.UpdateOptions{}) - if updateErr != nil { - return config.WrapStoreError("delete", "k8s", key, updateErr) + delete(existing.Data, k8sKey) + if existing.Annotations != nil { + delete(existing.Annotations, codecAnnotationKey(k8sKey)) } - return nil -} - -func (s *Store) deleteFromSecret(ctx context.Context, namespace, key, k8sNS, k8sKey string) error { - secName := secretResourceName(namespace) - sec, err := s.client.CoreV1().Secrets(k8sNS).Get(ctx, secName, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - return &config.KeyNotFoundError{Key: key, Namespace: namespace} - } + if _, err := s.client.Upsert(ctx, kind, k8sNS, existing); err != nil { return config.WrapStoreError("delete", "k8s", key, err) } - if _, ok := sec.Data[k8sKey]; !ok { - return &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - delete(sec.Data, k8sKey) - delete(sec.Annotations, codecAnnotationKey(k8sKey)) - _, updateErr := s.client.CoreV1().Secrets(k8sNS).Update(ctx, sec, metav1.UpdateOptions{}) - if updateErr != nil { - return config.WrapStoreError("delete", "k8s", key, updateErr) - } return nil } // Find returns a page of configuration entries matching the filter. -// Reads from the informer cache (no API call). +// ConfigMap entries are returned; Secret entries are not enumerated by Find. func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter) (config.Page, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -423,10 +290,7 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter filter = config.NewFilter().Build() } - k8sNS := s.k8sNamespace(namespace) - cmName := configMapResourceName(namespace) - - // Keys mode: exact key lookup. + // Keys mode: per-key Get (handles secret routing automatically). if keys := filter.Keys(); len(keys) > 0 { results := make(map[string]config.Value) for _, key := range keys { @@ -438,34 +302,35 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter return config.NewPage(results, "", 0), nil } - // Prefix mode: scan ConfigMap data. prefix := filter.Prefix() cursor := filter.Cursor() limit := filter.Limit() - lister := s.factory.Core().V1().ConfigMaps().Lister() - cm, err := lister.ConfigMaps(k8sNS).Get(cmName) + k8sNS := s.k8sNamespace(namespace) + cmName := configMapResourceName(namespace) + + cm, err := s.client.Get(ctx, KindConfigMap, k8sNS, cmName) if err != nil { - // No ConfigMap yet — return empty page. - return config.NewPage(make(map[string]config.Value), "", limit), nil + if errors.Is(err, ErrNotFound) { + return config.NewPage(make(map[string]config.Value), "", limit), nil + } + return nil, config.WrapStoreError("find", "k8s", "", err) } - // Collect and sort config keys for consistent pagination. type kv struct { configKey string - rawVal string + raw []byte } var matches []kv - for k8sKey, rawVal := range cm.Data { + for k8sKey, raw := range cm.Data { configKey := k8sKeyToConfigKey(k8sKey) if prefix != "" && !strings.HasPrefix(configKey, prefix) { continue } - // Cursor is the last config key from the previous page. if cursor != "" && configKey <= cursor { continue } - matches = append(matches, kv{configKey: configKey, rawVal: rawVal}) + matches = append(matches, kv{configKey: configKey, raw: raw}) } sort.Slice(matches, func(i, j int) bool { @@ -480,7 +345,7 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter var lastKey string for _, m := range matches { k8sKey := configKeyToK8sKey(m.configKey) - v, valErr := makeValueFromString(m.rawVal, namespace, m.configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) + v, valErr := makeValueFromBytes(ctx, m.raw, namespace, m.configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) if valErr == nil { results[m.configKey] = v lastKey = m.configKey @@ -495,6 +360,9 @@ func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan co if s.closed.Load() { return nil, config.ErrStoreClosed } + if !s.connected.Load() { + return nil, config.ErrStoreNotConnected + } ctx, cancel := context.WithCancel(ctx) ch := make(chan config.ChangeEvent, s.opts.watchBufSize) @@ -513,7 +381,7 @@ func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan co go func() { select { case <-ctx.Done(): - case <-s.stopChan: + case <-s.watchCtx.Done(): } s.watchMu.Lock() @@ -531,220 +399,105 @@ func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan co return ch, nil } -// Health performs a basic health check by listing namespaces. +// Health performs a health check by delegating to the Client. func (s *Store) Health(ctx context.Context) error { if s.closed.Load() { return config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return config.ErrStoreNotConnected } - _, err := s.client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{Limit: 1}) - if err != nil { + if err := s.client.Health(ctx); err != nil { return config.WrapStoreError("health", "k8s", "", err) } return nil } -// --------------------------------------------------------------------------- -// Informer event handlers -// --------------------------------------------------------------------------- - -func (s *Store) onConfigMapAdd(obj any) { - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - ns := s.configNamespace(cm.Namespace, cm.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey, rawVal := range cm.Data { - configKey := k8sKeyToConfigKey(k8sKey) - v, err := makeValueFromString(rawVal, ns, configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) - if err != nil { - continue +// runWatch consumes events from the Client and dispatches per-key change events +// to subscribers. It exits when watchCtx is cancelled or events is closed. +func (s *Store) runWatch(events <-chan Event) { + defer close(s.watchDone) + for { + select { + case <-s.watchCtx.Done(): + return + case ev, ok := <-events: + if !ok { + return + } + s.dispatchEvent(ev) } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) } } -func (s *Store) onConfigMapUpdate(old, newObj any) { - oldCM, ok1 := old.(*corev1.ConfigMap) - newCM, ok2 := newObj.(*corev1.ConfigMap) - if !ok1 || !ok2 { - return - } - ns := s.configNamespace(newCM.Namespace, newCM.Name) - if ns == "" { - return - } +// dispatchEvent translates a single resource-level Event into per-key +// config.ChangeEvents and delivers them to matching watchers. +func (s *Store) dispatchEvent(ev Event) { now := time.Now().UTC() - - // Keys added or updated. - for k8sKey, newVal := range newCM.Data { - configKey := k8sKeyToConfigKey(k8sKey) - oldVal, existed := oldCM.Data[k8sKey] - if !existed || oldVal != newVal { - v, err := makeValueFromString(newVal, ns, configKey, newCM.ResourceVersion, resolveCodec(newCM.Annotations, k8sKey)) - if err != nil { - continue - } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) - } - } - - // Keys deleted. - for k8sKey := range oldCM.Data { - if _, ok := newCM.Data[k8sKey]; !ok { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } + ns := s.configNamespace(ev) + if ns == "" { + return // not a resource we manage } -} -func (s *Store) onConfigMapDelete(obj any) { - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - // Handle DeletedFinalStateUnknown tombstone. - tombstone, isTombstone := obj.(cache.DeletedFinalStateUnknown) - if !isTombstone { + switch ev.Type { + case EventAdd: + if ev.New == nil { return } - cm, ok = tombstone.Obj.(*corev1.ConfigMap) - if !ok { + for k8sKey, raw := range ev.New.Data { + s.emitSet(ns, ev.New, k8sKey, raw, now) + } + case EventUpdate: + if ev.New == nil { return } - } - ns := s.configNamespace(cm.Namespace, cm.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey := range cm.Data { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } -} - -func (s *Store) onSecretAdd(obj any) { - sec, ok := obj.(*corev1.Secret) - if !ok { - return - } - ns := s.configNamespace(sec.Namespace, sec.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey, rawBytes := range sec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - v, err := makeValueFromBytes(rawBytes, ns, configKey, sec.ResourceVersion, resolveCodec(sec.Annotations, k8sKey)) - if err != nil { - continue + var oldData map[string][]byte + if ev.Old != nil { + oldData = ev.Old.Data } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) - } -} - -func (s *Store) onSecretUpdate(old, newObj any) { - oldSec, ok1 := old.(*corev1.Secret) - newSec, ok2 := newObj.(*corev1.Secret) - if !ok1 || !ok2 { - return - } - ns := s.configNamespace(newSec.Namespace, newSec.Name) - if ns == "" { - return - } - now := time.Now().UTC() - - for k8sKey, newBytes := range newSec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - oldBytes, existed := oldSec.Data[k8sKey] - if !existed || string(oldBytes) != string(newBytes) { - v, err := makeValueFromBytes(newBytes, ns, configKey, newSec.ResourceVersion, resolveCodec(newSec.Annotations, k8sKey)) - if err != nil { - continue + for k8sKey, newRaw := range ev.New.Data { + oldRaw, existed := oldData[k8sKey] + if !existed || !bytes.Equal(oldRaw, newRaw) { + s.emitSet(ns, ev.New, k8sKey, newRaw, now) } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) } - } - - for k8sKey := range oldSec.Data { - if _, ok := newSec.Data[k8sKey]; !ok { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) + for k8sKey := range oldData { + if _, ok := ev.New.Data[k8sKey]; !ok { + s.emitDelete(ns, k8sKey, now) + } } - } -} - -func (s *Store) onSecretDelete(obj any) { - sec, ok := obj.(*corev1.Secret) - if !ok { - tombstone, isTombstone := obj.(cache.DeletedFinalStateUnknown) - if !isTombstone { + case EventDelete: + if ev.Old == nil { return } - sec, ok = tombstone.Obj.(*corev1.Secret) - if !ok { - return + for k8sKey := range ev.Old.Data { + s.emitDelete(ns, k8sKey, now) } } - ns := s.configNamespace(sec.Namespace, sec.Name) - if ns == "" { +} + +func (s *Store) emitSet(ns string, r *Resource, k8sKey string, raw []byte, ts time.Time) { + configKey := k8sKeyToConfigKey(k8sKey) + v, err := makeValueFromBytes(context.Background(), raw, ns, configKey, r.ResourceVersion, resolveCodec(r.Annotations, k8sKey)) + if err != nil { return } - now := time.Now().UTC() - for k8sKey := range sec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } + s.notifyWatchers(config.ChangeEvent{ + Type: config.ChangeTypeSet, + Namespace: ns, + Key: configKey, + Value: v, + Timestamp: ts, + }) +} + +func (s *Store) emitDelete(ns, k8sKey string, ts time.Time) { + s.notifyWatchers(config.ChangeEvent{ + Type: config.ChangeTypeDelete, + Namespace: ns, + Key: k8sKeyToConfigKey(k8sKey), + Timestamp: ts, + }) } // --------------------------------------------------------------------------- @@ -752,8 +505,6 @@ func (s *Store) onSecretDelete(obj any) { // --------------------------------------------------------------------------- // k8sNamespace returns the Kubernetes namespace to use for a given config namespace. -// If opts.k8sNamespace is set, all config namespaces map to it. -// Otherwise the config namespace name is used directly. func (s *Store) k8sNamespace(configNS string) string { if s.opts.k8sNamespace != "" { return s.opts.k8sNamespace @@ -764,14 +515,31 @@ func (s *Store) k8sNamespace(configNS string) string { return configNS } -// configNamespace reverses the k8s resource name back to the config namespace. -// Returns "" if the resource is not managed by this store. -func (s *Store) configNamespace(k8sNS, resourceName string) string { - if strings.HasPrefix(resourceName, secretNamePrefix) { - return strings.TrimPrefix(resourceName, secretNamePrefix) +// resourceFor returns the kind and resource name backing the given config key. +func (s *Store) resourceFor(namespace, key string) (Kind, string) { + if isSecretKey(key, s.opts.secretPrefix) { + return KindSecret, secretResourceName(namespace) } - if strings.HasPrefix(resourceName, configMapPrefix) { - return strings.TrimPrefix(resourceName, configMapPrefix) + return KindConfigMap, configMapResourceName(namespace) +} + +// configNamespace recovers the config namespace from a watched event. +// Returns "" if the resource is not managed by this store. +func (s *Store) configNamespace(ev Event) string { + var name string + switch { + case ev.New != nil: + name = ev.New.Name + case ev.Old != nil: + name = ev.Old.Name + default: + return "" + } + if rest, ok := strings.CutPrefix(name, secretNamePrefix); ok { + return rest + } + if rest, ok := strings.CutPrefix(name, configMapPrefix); ok { + return rest } return "" } @@ -802,34 +570,37 @@ func isSecretKey(key, prefix string) bool { return prefix != "" && strings.HasPrefix(key, prefix) } -// makeValueFromString creates a config.Value from a ConfigMap string value. -// codecName should be the value's codec as recorded in the ConfigMap annotations; -// fall back to "json" if unknown. -func makeValueFromString(rawStr, namespace, key, resourceVersion, codecName string) (config.Value, error) { - if codecName == "" { - codecName = "json" +// mergeResource constructs a Resource for an Upsert: clones existing or builds +// a new one with the given data key and codec annotation set. +func mergeResource(existing *Resource, name, k8sKey string, raw []byte, codecName string) *Resource { + if existing == nil { + return &Resource{ + Name: name, + Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, + Data: map[string][]byte{k8sKey: raw}, + } } - version := parseResourceVersion(resourceVersion) - entryID := namespace + "/" + key - return config.NewValueFromBytes( - context.Background(), - []byte(rawStr), - codecName, - config.WithValueMetadata(version, time.Time{}, time.Time{}), - config.WithValueEntryID(entryID), - ) + if existing.Data == nil { + existing.Data = make(map[string][]byte) + } + if existing.Annotations == nil { + existing.Annotations = make(map[string]string) + } + existing.Data[k8sKey] = raw + existing.Annotations[codecAnnotationKey(k8sKey)] = codecName + return existing } -// makeValueFromBytes creates a config.Value from a Secret byte value. -func makeValueFromBytes(rawBytes []byte, namespace, key, resourceVersion, codecName string) (config.Value, error) { +// makeValueFromBytes builds a config.Value from raw bytes plus k8s metadata. +func makeValueFromBytes(ctx context.Context, raw []byte, namespace, key, resourceVersion, codecName string) (config.Value, error) { if codecName == "" { codecName = "json" } version := parseResourceVersion(resourceVersion) entryID := namespace + "/" + key return config.NewValueFromBytes( - context.Background(), - rawBytes, + ctx, + raw, codecName, config.WithValueMetadata(version, time.Time{}, time.Time{}), config.WithValueEntryID(entryID), diff --git a/k8s/store_test.go b/k8s/store_test.go new file mode 100644 index 0000000..711eeea --- /dev/null +++ b/k8s/store_test.go @@ -0,0 +1,310 @@ +package k8s + +import ( + "context" + "errors" + "maps" + "sync" + "testing" + "time" + + "github.com/rbaliyan/config" + _ "github.com/rbaliyan/config/codec/json" +) + +// fakeClient is an in-memory Client used by tests. It mimics enough of +// kubernetes' ConfigMap/Secret semantics to exercise Store end-to-end. +type fakeClient struct { + mu sync.Mutex + cms map[string]map[string]*Resource // namespace -> name -> resource + secrets map[string]map[string]*Resource + rv int + events chan Event + healthOK bool +} + +func newFakeClient() *fakeClient { + return &fakeClient{ + cms: map[string]map[string]*Resource{}, + secrets: map[string]map[string]*Resource{}, + events: make(chan Event, 32), + healthOK: true, + } +} + +func (f *fakeClient) bucket(kind Kind) map[string]map[string]*Resource { + if kind == KindSecret { + return f.secrets + } + return f.cms +} + +func (f *fakeClient) Get(_ context.Context, kind Kind, namespace, name string) (*Resource, error) { + f.mu.Lock() + defer f.mu.Unlock() + ns := f.bucket(kind)[namespace] + if ns == nil { + return nil, ErrNotFound + } + r := ns[name] + if r == nil { + return nil, ErrNotFound + } + return cloneResource(r), nil +} + +func (f *fakeClient) Upsert(_ context.Context, kind Kind, namespace string, r *Resource) (*Resource, error) { + f.mu.Lock() + defer f.mu.Unlock() + bucket := f.bucket(kind) + if bucket[namespace] == nil { + bucket[namespace] = map[string]*Resource{} + } + old := bucket[namespace][r.Name] + f.rv++ + stored := cloneResource(r) + stored.ResourceVersion = itoa(f.rv) + bucket[namespace][r.Name] = stored + + ev := Event{Kind: kind, Namespace: namespace, New: cloneResource(stored)} + if old == nil { + ev.Type = EventAdd + } else { + ev.Type = EventUpdate + ev.Old = cloneResource(old) + } + select { + case f.events <- ev: + default: + } + return cloneResource(stored), nil +} + +func (f *fakeClient) Watch(ctx context.Context, _ string) (<-chan Event, error) { + out := make(chan Event, 32) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-f.events: + if !ok { + return + } + select { + case out <- ev: + case <-ctx.Done(): + return + } + } + } + }() + return out, nil +} + +func (f *fakeClient) Health(_ context.Context) error { + if !f.healthOK { + return errors.New("unhealthy") + } + return nil +} + +func cloneResource(r *Resource) *Resource { + if r == nil { + return nil + } + out := &Resource{ + Name: r.Name, + ResourceVersion: r.ResourceVersion, + } + if r.Annotations != nil { + out.Annotations = make(map[string]string, len(r.Annotations)) + maps.Copy(out.Annotations, r.Annotations) + } + if r.Data != nil { + out.Data = make(map[string][]byte, len(r.Data)) + for k, v := range r.Data { + b := make([]byte, len(v)) + copy(b, v) + out.Data[k] = b + } + } + return out +} + +func itoa(n int) string { + const digits = "0123456789" + if n == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = digits[n%10] + n /= 10 + } + return string(buf[i:]) +} + +func mustConnect(t *testing.T, s *Store) { + t.Helper() + if err := s.Connect(context.Background()); err != nil { + t.Fatalf("Connect: %v", err) + } + t.Cleanup(func() { _ = s.Close(context.Background()) }) +} + +func TestStore_SetGetDelete_ConfigMap(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + val := config.NewValue(map[string]any{"host": "localhost", "port": 5432}) + stored, err := s.Set(ctx, "prod", "db/conn", val) + if err != nil { + t.Fatalf("Set: %v", err) + } + if stored.Metadata().Version() == 0 { + t.Errorf("expected non-zero version, got 0") + } + + got, err := s.Get(ctx, "prod", "db/conn") + if err != nil { + t.Fatalf("Get: %v", err) + } + var out map[string]any + if err := got.Unmarshal(ctx, &out); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if out["host"] != "localhost" { + t.Errorf("expected localhost, got %v", out["host"]) + } + + if err := s.Delete(ctx, "prod", "db/conn"); err != nil { + t.Fatalf("Delete: %v", err) + } + if _, err := s.Get(ctx, "prod", "db/conn"); !config.IsNotFound(err) { + t.Errorf("expected not found, got %v", err) + } +} + +func TestStore_SecretRouting(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + val := config.NewValue("super-secret") + if _, err := s.Set(ctx, "prod", "secret/api-key", val); err != nil { + t.Fatalf("Set: %v", err) + } + if _, ok := fc.secrets["prod"]["config-secrets-prod"]; !ok { + t.Errorf("expected secret resource to exist; secrets=%+v", fc.secrets) + } + if _, ok := fc.cms["prod"]["config-prod"]; ok { + t.Errorf("did not expect configmap for secret-prefixed key") + } +} + +func TestStore_GetMissing(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + if _, err := s.Get(ctx, "prod", "missing/key"); !config.IsNotFound(err) { + t.Errorf("expected not-found, got %v", err) + } +} + +func TestStore_Find_PrefixAndCursor(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + for _, k := range []string{"app/a", "app/b", "app/c", "other/x"} { + if _, err := s.Set(ctx, "prod", k, config.NewValue(k)); err != nil { + t.Fatalf("Set %s: %v", k, err) + } + } + + page, err := s.Find(ctx, "prod", config.NewFilter().WithPrefix("app/").WithLimit(2).Build()) + if err != nil { + t.Fatalf("Find: %v", err) + } + if len(page.Results()) != 2 { + t.Errorf("expected 2 results, got %d", len(page.Results())) + } + if page.NextCursor() != "app/b" { + t.Errorf("expected cursor app/b, got %q", page.NextCursor()) + } + + next, err := s.Find(ctx, "prod", config.NewFilter().WithPrefix("app/").WithLimit(2).WithCursor(page.NextCursor()).Build()) + if err != nil { + t.Fatalf("Find page2: %v", err) + } + if _, ok := next.Results()["app/c"]; !ok { + t.Errorf("expected app/c in second page; got %+v", next.Results()) + } +} + +func TestStore_Watch_DeliversChanges(t *testing.T) { + ctx := t.Context() + + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + ch, err := s.Watch(ctx, config.WatchFilter{Namespaces: []string{"prod"}}) + if err != nil { + t.Fatalf("Watch: %v", err) + } + + if _, err := s.Set(ctx, "prod", "feature/flag", config.NewValue(true)); err != nil { + t.Fatalf("Set: %v", err) + } + + select { + case ev := <-ch: + if ev.Type != config.ChangeTypeSet || ev.Key != "feature/flag" || ev.Namespace != "prod" { + t.Errorf("unexpected event: %+v", ev) + } + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for watch event") + } +} + +func TestStore_Health(t *testing.T) { + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + if err := s.Health(context.Background()); err != nil { + t.Errorf("Health: %v", err) + } + fc.healthOK = false + if err := s.Health(context.Background()); err == nil { + t.Errorf("expected unhealthy error") + } +} + +func TestStore_NotConnected(t *testing.T) { + s := NewStore(newFakeClient()) + if _, err := s.Get(context.Background(), "prod", "k"); !errors.Is(err, config.ErrStoreNotConnected) { + t.Errorf("expected ErrStoreNotConnected, got %v", err) + } +} + +func TestStore_AfterClose(t *testing.T) { + s := NewStore(newFakeClient()) + mustConnect(t, s) + if err := s.Close(context.Background()); err != nil { + t.Fatalf("Close: %v", err) + } + if _, err := s.Get(context.Background(), "prod", "k"); !errors.Is(err, config.ErrStoreClosed) { + t.Errorf("expected ErrStoreClosed, got %v", err) + } +} diff --git a/k8s/watch.go b/k8s/watch.go index a6cdef2..8e661db 100644 --- a/k8s/watch.go +++ b/k8s/watch.go @@ -7,6 +7,7 @@ import ( "github.com/rbaliyan/config" ) +// watchEntry represents a single subscriber to Store.Watch. type watchEntry struct { filter config.WatchFilter ch chan config.ChangeEvent @@ -37,7 +38,7 @@ func (s *Store) notifyWatchers(event config.ChangeEvent) { } } -// sendToWatcher safely sends an event to a watcher, handling closed channels. +// sendToWatcher safely sends an event to a watcher, dropping on a full channel. func (s *Store) sendToWatcher(we *watchEntry, event config.ChangeEvent) { we.mu.Lock() defer we.mu.Unlock() From 525dede062158c410fb6c2d74dfc80f7ba2965b3 Mon Sep 17 00:00:00 2001 From: Rajeev Baliyan <20521486+rbaliyan@users.noreply.github.com> Date: Fri, 8 May 2026 15:29:03 +0530 Subject: [PATCH 2/3] docs(k8s): tighten interface contract and store godoc Address review feedback on PR #74: - Clarify Find behavior: Keys mode resolves secret-prefixed keys; Prefix mode scans the namespace ConfigMap only. - Document Store.Watch drop-on-full-buffer behavior and connection precondition. - Note that Connect's watch lifetime is bounded by Close, not by ctx. - Soften Client.Watch reconnect requirement to "recommended" so the bundled reference adapter does not violate its own contract; document NewRetryWatcher as the standard way to add resilience. - Mark Event.Old as best-effort; describe Store fallback when Old is nil. - Expand WithK8sNamespace doc with the empty-config-namespace fallback. - Update README ctx declaration so the snippet compiles as written. --- k8s/client.go | 21 +++++++++++++-------- k8s/example/README.md | 4 ++++ k8s/example/adapter.go | 11 +++++++---- k8s/options.go | 9 ++++++--- k8s/store.go | 12 +++++++++++- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/k8s/client.go b/k8s/client.go index 19c1b0c..6dd3516 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -64,7 +64,10 @@ type Event struct { Kind Kind // Namespace is the Kubernetes namespace of the resource. Namespace string - // Old is the previous resource state. Set on EventUpdate and EventDelete. + // Old is the previous resource state. Best-effort: adapters may set it on + // EventUpdate and EventDelete to enable minimal-diff dispatch by the Store, + // but a nil Old is supported — the Store treats every key in New as + // changed and skips delete inference, which is safe but noisier. Old *Resource // New is the new resource state. Set on EventAdd and EventUpdate. New *Resource @@ -93,15 +96,17 @@ type Client interface { Upsert(ctx context.Context, kind Kind, namespace string, r *Resource) (*Resource, error) // Watch starts a watch over both ConfigMaps and Secrets in namespace - // (or all namespaces when namespace is ""). The returned channel must: + // (or all namespaces when namespace is ""). // - // - Emit one Event per observed add/update/delete. - // - Be closed when ctx is cancelled or the underlying watch terminates. - // - Reconnect transparently on transient errors. + // Required: emit one Event per observed add/update/delete; close the + // returned channel when ctx is cancelled or the underlying watch + // terminates. The Store ignores events for resources whose names do not + // match its managed prefixes ("config-" and "config-secrets-"), so + // adapters do not need to filter. // - // Adapters typically filter to resources whose names begin with the Store's - // managed prefixes ("config-" and "config-secrets-") to avoid noisy events; - // the Store also filters events on its side. + // Recommended: reconnect transparently on transient API errors (e.g., via + // k8s.io/client-go/tools/watch.NewRetryWatcher). The bundled reference + // adapter does not, and closes the channel on the first upstream error. Watch(ctx context.Context, namespace string) (<-chan Event, error) // Health performs a lightweight liveness check (e.g., listing namespaces). diff --git a/k8s/example/README.md b/k8s/example/README.md index 9ce3a83..28f43ee 100644 --- a/k8s/example/README.md +++ b/k8s/example/README.md @@ -23,12 +23,16 @@ mongodb stores behave. ```go import ( + "context" + "github.com/rbaliyan/config/k8s" kubeclient "github.com/rbaliyan/config/k8s/example" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) +ctx := context.Background() + cfg, _ := rest.InClusterConfig() // or clientcmd.BuildConfigFromFlags(...) cs, _ := kubernetes.NewForConfig(cfg) diff --git a/k8s/example/adapter.go b/k8s/example/adapter.go index 890e000..3e304be 100644 --- a/k8s/example/adapter.go +++ b/k8s/example/adapter.go @@ -129,10 +129,13 @@ func (a *Adapter) upsertSecret(ctx context.Context, namespace string, r *k8s.Res } // Watch starts watching ConfigMaps and Secrets in namespace and forwards -// add/update/delete events on a single channel. The channel is closed when ctx -// is cancelled or when both upstream watches end. Transient watch errors are -// ignored — re-establishing the watch is the caller's responsibility (or wire -// in client-go's watch.NewRetryWatcher for automatic recovery). +// add/update/delete events on a single channel. The channel is closed when +// ctx is cancelled or when either upstream watch terminates. This adapter +// does NOT auto-reconnect on transient errors — wrap the upstream watches +// with k8s.io/client-go/tools/watch.NewRetryWatcher for resilient operation. +// Old is left nil on every Event; the Store treats every key in New as +// changed and skips delete inference, which is correct but noisier than a +// minimal-diff implementation. func (a *Adapter) Watch(ctx context.Context, namespace string) (<-chan k8s.Event, error) { cmW, err := a.cs.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.ListOptions{}) if err != nil { diff --git a/k8s/options.go b/k8s/options.go index ca7317f..a1bd0b2 100644 --- a/k8s/options.go +++ b/k8s/options.go @@ -21,9 +21,12 @@ func defaultOptions() storeOptions { // Option configures the k8s store. type Option func(*storeOptions) -// WithK8sNamespace restricts the store to a single Kubernetes namespace. -// When empty (the default), the store operates across all namespaces and the -// config namespace name is used directly as the Kubernetes namespace. +// WithK8sNamespace pins all reads and writes to a single Kubernetes namespace. +// +// When set, every config namespace maps to this Kubernetes namespace and +// Watch is scoped to it. When empty (the default), the config namespace name +// is used directly as the Kubernetes namespace; an empty config namespace +// falls back to the Kubernetes "default" namespace. func WithK8sNamespace(ns string) Option { return func(o *storeOptions) { o.k8sNamespace = ns diff --git a/k8s/store.go b/k8s/store.go index c746162..ebde4d5 100644 --- a/k8s/store.go +++ b/k8s/store.go @@ -109,6 +109,8 @@ func (s *Store) BackendName() string { return "k8s" } // Connect starts the background watch loop. The first read after Connect // hits the Kubernetes API via the Client; there is no initial cache sync. +// The watch lifetime is bounded by Close, not by ctx — ctx is only used for +// validation prior to starting the watch. func (s *Store) Connect(ctx context.Context) error { if s.closed.Load() { return config.ErrStoreClosed @@ -275,7 +277,12 @@ func (s *Store) Delete(ctx context.Context, namespace, key string) error { } // Find returns a page of configuration entries matching the filter. -// ConfigMap entries are returned; Secret entries are not enumerated by Find. +// +// In Keys mode (filter.Keys() non-empty), each key is resolved individually +// and secret-prefixed keys are read from the namespace Secret as expected. +// In Prefix/cursor mode, only the namespace ConfigMap is scanned; Secret +// data keys are not enumerated. Use Keys mode to read secret-prefixed keys +// in bulk. func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter) (config.Page, error) { if s.closed.Load() { return nil, config.ErrStoreClosed @@ -356,6 +363,9 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter } // Watch returns a channel that receives change events for matching keys. +// Returns ErrStoreNotConnected if called before Connect. Events are dropped +// when the per-subscriber buffer is full (size set via WithWatchBufferSize). +// The channel is closed when ctx is cancelled or the Store is closed. func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan config.ChangeEvent, error) { if s.closed.Load() { return nil, config.ErrStoreClosed From a2e5bdab6b76949e2be788106180381c920a018d Mon Sep 17 00:00:00 2001 From: Rajeev Baliyan <20521486+rbaliyan@users.noreply.github.com> Date: Fri, 8 May 2026 15:35:08 +0530 Subject: [PATCH 3/3] ci: update gosec workflow for new k8s package layout The k8s package no longer has its own go.mod (it's part of the main module now), so the previous "k8s submodule" gosec step failed with no go.sum. Drop the exclude-dir for k8s and add a parallel step that scans the k8s/example adapter from its own module. --- .github/workflows/security.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index da510f9..8ba6107 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -47,13 +47,13 @@ jobs: run: go install github.com/securego/gosec/v2/cmd/gosec@v2.22.4 - name: Run GoSec - run: gosec -fmt sarif -out gosec-results.sarif -severity medium -confidence medium -exclude-dir k8s ./... + run: gosec -fmt sarif -out gosec-results.sarif -severity medium -confidence medium -exclude-dir k8s/example ./... - - name: Run GoSec (k8s submodule) - working-directory: k8s + - name: Run GoSec (k8s example adapter) + working-directory: k8s/example run: | go mod download - gosec -fmt sarif -out ../gosec-k8s-results.sarif -severity medium -confidence medium ./... + gosec -fmt sarif -out ../../gosec-k8s-example-results.sarif -severity medium -confidence medium ./... - name: Upload SARIF to GitHub Security uses: github/codeql-action/upload-sarif@e46ed2cbd01164d986452f91f178727624ae40d7 # v3 @@ -62,9 +62,9 @@ jobs: sarif_file: gosec-results.sarif category: gosec - - name: Upload SARIF (k8s) to GitHub Security + - name: Upload SARIF (k8s example) to GitHub Security uses: github/codeql-action/upload-sarif@e46ed2cbd01164d986452f91f178727624ae40d7 # v3 if: always() with: - sarif_file: gosec-k8s-results.sarif - category: gosec-k8s + sarif_file: gosec-k8s-example-results.sarif + category: gosec-k8s-example