diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index da510f9..8ba6107 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -47,13 +47,13 @@ jobs: run: go install github.com/securego/gosec/v2/cmd/gosec@v2.22.4 - name: Run GoSec - run: gosec -fmt sarif -out gosec-results.sarif -severity medium -confidence medium -exclude-dir k8s ./... + run: gosec -fmt sarif -out gosec-results.sarif -severity medium -confidence medium -exclude-dir k8s/example ./... - - name: Run GoSec (k8s submodule) - working-directory: k8s + - name: Run GoSec (k8s example adapter) + working-directory: k8s/example run: | go mod download - gosec -fmt sarif -out ../gosec-k8s-results.sarif -severity medium -confidence medium ./... + gosec -fmt sarif -out ../../gosec-k8s-example-results.sarif -severity medium -confidence medium ./... - name: Upload SARIF to GitHub Security uses: github/codeql-action/upload-sarif@e46ed2cbd01164d986452f91f178727624ae40d7 # v3 @@ -62,9 +62,9 @@ jobs: sarif_file: gosec-results.sarif category: gosec - - name: Upload SARIF (k8s) to GitHub Security + - name: Upload SARIF (k8s example) to GitHub Security uses: github/codeql-action/upload-sarif@e46ed2cbd01164d986452f91f178727624ae40d7 # v3 if: always() with: - sarif_file: gosec-k8s-results.sarif - category: gosec-k8s + sarif_file: gosec-k8s-example-results.sarif + category: gosec-k8s-example diff --git a/CLAUDE.md b/CLAUDE.md index 0164244..28db205 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -70,11 +70,33 @@ config/ │ ├── store.go # Fallback, ReadThrough, WriteThrough strategies │ └── options.go │ -└── live/ # Live config binding - ├── ref.go # Atomic live reference (Ref[T]) - └── binding.go # Mutex-based live binding +├── live/ # Live config binding +│ ├── ref.go # Atomic live reference (Ref[T]) +│ └── binding.go # Mutex-based live binding +│ +└── k8s/ # Kubernetes ConfigMap/Secret store (no k8s.io deps) + ├── client.go # Client interface, Resource, Event, ErrNotFound + ├── store.go # Store implementation built on Client + ├── options.go # Store options + ├── watch.go # Watch fan-out + └── example/ # Reference adapter (separate go.mod, depends on k8s.io/*) + ├── adapter.go # kubernetes.Interface → Client adapter + └── main/ # Runnable demo ``` +### k8s store + +Unlike the postgres or mongodb stores, the k8s store does NOT import `k8s.io/*`. +Instead it depends on a narrow `k8s.Client` interface (Get/Upsert/Watch/Health) +and the caller supplies an adapter — see `k8s/example/` for a reference +implementation built on `kubernetes.Interface`. This keeps the heavy kubernetes +transitive dependency out of `github.com/rbaliyan/config` and lets the store +ride the same release cadence as the rest of the module. + +The store does not maintain a local resource cache: reads call straight through +to the Client and Manager-level caching handles repeats, mirroring postgres / +mongodb behavior. + ## Key Design Decisions ### Value vs Entry @@ -331,6 +353,7 @@ Optional (for specific backends): ## Recent Changes +- **k8s store decoupled from k8s.io/***: The `k8s` package now depends only on a `Client` interface (Get/Upsert/Watch/Health) instead of `kubernetes.Interface`. The previous `k8s/go.mod` is gone — k8s ships in the main module. A reference adapter using `kubernetes.Interface` lives at `k8s/example/` with its own `go.mod`. The store no longer maintains an informer cache; reads call straight through to the Client. - **Removed deprecated `live.Binding`**: Use `live.Ref[T]` instead for lock-free atomic reads. `DefaultPollInterval` and `ErrInvalidTarget` remain in the `live` package. - **Unexported `otel.Metrics`**: The metrics struct and fields are now unexported (`metrics`, `operationCount`, `errorCount`, `operationLatency`) - **Benchmarks**: `benchmark_test.go` provides benchmarks for Manager.Get, Value operations, MarkStale, and Store operations diff --git a/k8s/client.go b/k8s/client.go new file mode 100644 index 0000000..6dd3516 --- /dev/null +++ b/k8s/client.go @@ -0,0 +1,115 @@ +package k8s + +import ( + "context" + "errors" +) + +// Kind identifies which Kubernetes resource type a Resource represents. +// Store routes ConfigMap reads/writes for non-secret keys and Secret reads/writes +// for keys matching the configured secret prefix. +type Kind int + +const ( + // KindConfigMap represents a Kubernetes ConfigMap. + KindConfigMap Kind = iota + // KindSecret represents a Kubernetes Secret. + KindSecret +) + +// String returns the human-readable name of the kind. +func (k Kind) String() string { + switch k { + case KindConfigMap: + return "configmap" + case KindSecret: + return "secret" + default: + return "unknown" + } +} + +// Resource is the kubernetes-agnostic projection of a ConfigMap or Secret used +// by Store. ConfigMap string data and Secret byte data are both represented as +// []byte here; adapters convert between this form and the wire types. +type Resource struct { + // Name is the resource name (e.g., "config-prod" or "config-secrets-prod"). + Name string + // ResourceVersion is the Kubernetes ResourceVersion, used as the value version. + ResourceVersion string + // Annotations carries codec-name annotations populated by the Store. + // Adapters must round-trip annotations exactly. + Annotations map[string]string + // Data is the resource payload keyed by Kubernetes data key (slash converted to dot). + Data map[string][]byte +} + +// EventType describes the type of change observed in a Watch stream. +type EventType int + +const ( + // EventAdd is emitted when a watched resource is created or first observed. + EventAdd EventType = iota + // EventUpdate is emitted when a watched resource is modified. + EventUpdate + // EventDelete is emitted when a watched resource is removed. + EventDelete +) + +// Event is a single change notification produced by Client.Watch. +type Event struct { + // Type is the kind of change. + Type EventType + // Kind identifies whether the change applies to a ConfigMap or a Secret. + Kind Kind + // Namespace is the Kubernetes namespace of the resource. + Namespace string + // Old is the previous resource state. Best-effort: adapters may set it on + // EventUpdate and EventDelete to enable minimal-diff dispatch by the Store, + // but a nil Old is supported — the Store treats every key in New as + // changed and skips delete inference, which is safe but noisier. + Old *Resource + // New is the new resource state. Set on EventAdd and EventUpdate. + New *Resource +} + +// ErrNotFound is returned by Client.Get when the requested resource does not +// exist. Adapters must translate Kubernetes "not found" API errors to this +// sentinel so the Store can surface config.ErrNotFound to callers. +var ErrNotFound = errors.New("k8s resource not found") + +// Client is the kubernetes-facing surface required by Store. It is intentionally +// narrow so that adapters only need to translate a handful of operations and +// the main config module does not depend on k8s.io/* packages. +// +// Implementations must be safe for concurrent use by multiple goroutines. +// +// See k8s/example for a reference adapter built on top of kubernetes.Interface. +type Client interface { + // Get fetches a resource by namespace and name. Implementations must return + // ErrNotFound when the resource does not exist. Other errors are wrapped + // and surfaced verbatim by the Store. + Get(ctx context.Context, kind Kind, namespace, name string) (*Resource, error) + + // Upsert creates the resource if it does not exist, or updates it otherwise. + // The returned Resource carries the post-write ResourceVersion. + Upsert(ctx context.Context, kind Kind, namespace string, r *Resource) (*Resource, error) + + // Watch starts a watch over both ConfigMaps and Secrets in namespace + // (or all namespaces when namespace is ""). + // + // Required: emit one Event per observed add/update/delete; close the + // returned channel when ctx is cancelled or the underlying watch + // terminates. The Store ignores events for resources whose names do not + // match its managed prefixes ("config-" and "config-secrets-"), so + // adapters do not need to filter. + // + // Recommended: reconnect transparently on transient API errors (e.g., via + // k8s.io/client-go/tools/watch.NewRetryWatcher). The bundled reference + // adapter does not, and closes the channel on the first upstream error. + Watch(ctx context.Context, namespace string) (<-chan Event, error) + + // Health performs a lightweight liveness check (e.g., listing namespaces). + // Used by Store.Health. + Health(ctx context.Context) error +} diff --git a/k8s/example/README.md b/k8s/example/README.md new file mode 100644 index 0000000..28f43ee --- /dev/null +++ b/k8s/example/README.md @@ -0,0 +1,62 @@ +# Kubernetes adapter example + +This module is a reference implementation of the +[`k8s.Client`](../client.go) interface defined by `github.com/rbaliyan/config/k8s`. +It is kept in a separate Go module so the main `config` module never depends on +`k8s.io/*` packages — config and the k8s store can be released on a single +cadence, while this adapter follows the kubernetes client release cycle. + +## What the adapter provides + +| `k8s.Client` method | Implementation | +|---|---| +| `Get` | `CoreV1().{ConfigMaps,Secrets}.Get` | +| `Upsert` | Create-or-Update on conflict (`IsAlreadyExists` → Update) | +| `Watch` | `CoreV1().{ConfigMaps,Secrets}.Watch` merged into one channel | +| `Health` | `CoreV1().Namespaces.List(Limit: 1)` | + +There is **no informer cache** — reads go straight to the API server and the +config Manager handles repeat-read caching. This mirrors how the postgres and +mongodb stores behave. + +## Wiring it up + +```go +import ( + "context" + + "github.com/rbaliyan/config/k8s" + kubeclient "github.com/rbaliyan/config/k8s/example" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +ctx := context.Background() + +cfg, _ := rest.InClusterConfig() // or clientcmd.BuildConfigFromFlags(...) +cs, _ := kubernetes.NewForConfig(cfg) + +store := k8s.NewStore(kubeclient.New(cs), + k8s.WithK8sNamespace("config-system"), // pin to one k8s namespace + k8s.WithSecretKeyPrefix("secret/"), // route secret/* keys to k8s Secrets +) + +if err := store.Connect(ctx); err != nil { ... } +defer store.Close(ctx) +``` + +See [`main/main.go`](./main/main.go) for a runnable end-to-end demo. + +## Notes for production use + +- **Reconnect on watch loss.** The adapter does not automatically resume a + dropped watch. Wrap the upstream `watch.Interface` with + [`watch.NewRetryWatcher`](https://pkg.go.dev/k8s.io/client-go/tools/watch#NewRetryWatcher) + if you need that behavior; the rest of the code remains unchanged. +- **RBAC.** The service account needs `get`, `list`, `watch`, `create`, `update` + on `configmaps` and `secrets` in the target namespace, plus `list` on + `namespaces` for the health check. +- **Annotations.** The store records the codec for each key in an annotation + (`config.rbaliyan.dev/codec-`). The adapter must round-trip annotations + exactly; this implementation does so via the standard ConfigMap/Secret + metadata. diff --git a/k8s/example/adapter.go b/k8s/example/adapter.go new file mode 100644 index 0000000..3e304be --- /dev/null +++ b/k8s/example/adapter.go @@ -0,0 +1,284 @@ +// Package kubeclient is a reference adapter that satisfies the [k8s.Client] +// interface defined by github.com/rbaliyan/config/k8s using the official +// Kubernetes Go client. +// +// The adapter intentionally lives in its own Go module so that the main config +// module does not pull in k8s.io/* dependencies. Copy this file into your own +// project (or import this module directly) to wire a [k8s.Store] up to a real +// cluster. +// +// Usage: +// +// cfg, err := rest.InClusterConfig() // or BuildConfigFromFlags(...) +// if err != nil { ... } +// cs, err := kubernetes.NewForConfig(cfg) +// if err != nil { ... } +// +// client := kubeclient.New(cs) +// store := k8s.NewStore(client, k8s.WithK8sNamespace("config-system")) +// if err := store.Connect(ctx); err != nil { ... } +// defer store.Close(ctx) +package kubeclient + +import ( + "context" + "fmt" + + "github.com/rbaliyan/config/k8s" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +) + +// Adapter implements [k8s.Client] over kubernetes.Interface. Reads and writes +// go straight to the API server (the Store does not maintain a local cache); +// Watch wraps Kubernetes watch.Interface streams into a single Event channel. +type Adapter struct { + cs kubernetes.Interface +} + +// New constructs an Adapter wrapping the given Kubernetes clientset. +func New(cs kubernetes.Interface) *Adapter { + return &Adapter{cs: cs} +} + +// Get fetches a ConfigMap or Secret by namespace and name. +func (a *Adapter) Get(ctx context.Context, kind k8s.Kind, namespace, name string) (*k8s.Resource, error) { + switch kind { + case k8s.KindConfigMap: + cm, err := a.cs.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, k8s.ErrNotFound + } + return nil, err + } + return cmToResource(cm), nil + case k8s.KindSecret: + sec, err := a.cs.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, k8s.ErrNotFound + } + return nil, err + } + return secretToResource(sec), nil + default: + return nil, fmt.Errorf("kubeclient: unknown kind %v", kind) + } +} + +// Upsert creates or updates a ConfigMap or Secret. +func (a *Adapter) Upsert(ctx context.Context, kind k8s.Kind, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + switch kind { + case k8s.KindConfigMap: + return a.upsertConfigMap(ctx, namespace, r) + case k8s.KindSecret: + return a.upsertSecret(ctx, namespace, r) + default: + return nil, fmt.Errorf("kubeclient: unknown kind %v", kind) + } +} + +func (a *Adapter) upsertConfigMap(ctx context.Context, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.Name, + Namespace: namespace, + Annotations: r.Annotations, + }, + Data: bytesToStringMap(r.Data), + } + created, err := a.cs.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{}) + if err == nil { + return cmToResource(created), nil + } + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + updated, err := a.cs.CoreV1().ConfigMaps(namespace).Update(ctx, cm, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + return cmToResource(updated), nil +} + +func (a *Adapter) upsertSecret(ctx context.Context, namespace string, r *k8s.Resource) (*k8s.Resource, error) { + sec := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.Name, + Namespace: namespace, + Annotations: r.Annotations, + }, + Data: r.Data, + } + created, err := a.cs.CoreV1().Secrets(namespace).Create(ctx, sec, metav1.CreateOptions{}) + if err == nil { + return secretToResource(created), nil + } + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + updated, err := a.cs.CoreV1().Secrets(namespace).Update(ctx, sec, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + return secretToResource(updated), nil +} + +// Watch starts watching ConfigMaps and Secrets in namespace and forwards +// add/update/delete events on a single channel. The channel is closed when +// ctx is cancelled or when either upstream watch terminates. This adapter +// does NOT auto-reconnect on transient errors — wrap the upstream watches +// with k8s.io/client-go/tools/watch.NewRetryWatcher for resilient operation. +// Old is left nil on every Event; the Store treats every key in New as +// changed and skips delete inference, which is correct but noisier than a +// minimal-diff implementation. +func (a *Adapter) Watch(ctx context.Context, namespace string) (<-chan k8s.Event, error) { + cmW, err := a.cs.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + secW, err := a.cs.CoreV1().Secrets(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + cmW.Stop() + return nil, err + } + + out := make(chan k8s.Event, 32) + go func() { + defer close(out) + defer cmW.Stop() + defer secW.Stop() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-cmW.ResultChan(): + if !ok { + return + } + if e, kind, fwd := translateConfigMapEvent(ev); fwd { + send(ctx, out, k8s.Event{Type: e, Kind: kind, Namespace: namespace, Old: nil, New: cmObjToResource(ev.Object)}) + } + case ev, ok := <-secW.ResultChan(): + if !ok { + return + } + if e, kind, fwd := translateSecretEvent(ev); fwd { + send(ctx, out, k8s.Event{Type: e, Kind: kind, Namespace: namespace, Old: nil, New: secObjToResource(ev.Object)}) + } + } + } + }() + return out, nil +} + +// Health performs a lightweight liveness check by listing namespaces. +func (a *Adapter) Health(ctx context.Context) error { + _, err := a.cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{Limit: 1}) + return err +} + +// --------------------------------------------------------------------------- +// Conversion helpers +// --------------------------------------------------------------------------- + +func cmToResource(cm *corev1.ConfigMap) *k8s.Resource { + if cm == nil { + return nil + } + return &k8s.Resource{ + Name: cm.Name, + ResourceVersion: cm.ResourceVersion, + Annotations: cm.Annotations, + Data: stringToBytesMap(cm.Data), + } +} + +func secretToResource(sec *corev1.Secret) *k8s.Resource { + if sec == nil { + return nil + } + return &k8s.Resource{ + Name: sec.Name, + ResourceVersion: sec.ResourceVersion, + Annotations: sec.Annotations, + Data: sec.Data, + } +} + +func cmObjToResource(obj any) *k8s.Resource { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil + } + return cmToResource(cm) +} + +func secObjToResource(obj any) *k8s.Resource { + sec, ok := obj.(*corev1.Secret) + if !ok { + return nil + } + return secretToResource(sec) +} + +func translateConfigMapEvent(ev watch.Event) (k8s.EventType, k8s.Kind, bool) { + switch ev.Type { + case watch.Added: + return k8s.EventAdd, k8s.KindConfigMap, true + case watch.Modified: + return k8s.EventUpdate, k8s.KindConfigMap, true + case watch.Deleted: + return k8s.EventDelete, k8s.KindConfigMap, true + default: + return 0, k8s.KindConfigMap, false + } +} + +func translateSecretEvent(ev watch.Event) (k8s.EventType, k8s.Kind, bool) { + switch ev.Type { + case watch.Added: + return k8s.EventAdd, k8s.KindSecret, true + case watch.Modified: + return k8s.EventUpdate, k8s.KindSecret, true + case watch.Deleted: + return k8s.EventDelete, k8s.KindSecret, true + default: + return 0, k8s.KindSecret, false + } +} + +func send(ctx context.Context, out chan<- k8s.Event, ev k8s.Event) { + select { + case out <- ev: + case <-ctx.Done(): + } +} + +func stringToBytesMap(in map[string]string) map[string][]byte { + if in == nil { + return nil + } + out := make(map[string][]byte, len(in)) + for k, v := range in { + out[k] = []byte(v) + } + return out +} + +func bytesToStringMap(in map[string][]byte) map[string]string { + if in == nil { + return nil + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = string(v) + } + return out +} + +var _ k8s.Client = (*Adapter)(nil) diff --git a/k8s/go.mod b/k8s/example/go.mod similarity index 80% rename from k8s/go.mod rename to k8s/example/go.mod index 903fab7..0783bd2 100644 --- a/k8s/go.mod +++ b/k8s/example/go.mod @@ -1,9 +1,9 @@ -module github.com/rbaliyan/config/k8s +module github.com/rbaliyan/config/k8s/example go 1.26.3 require ( - github.com/rbaliyan/config v0.6.0 + github.com/rbaliyan/config v0.0.0 k8s.io/api v0.33.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 @@ -29,14 +29,16 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.38.0 // indirect + go.mongodb.org/mongo-driver/v2 v2.6.0 // indirect + golang.org/x/net v0.52.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sys v0.42.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.33.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/term v0.41.0 // indirect + golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.9.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + google.golang.org/protobuf v1.36.10 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -48,3 +50,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +replace github.com/rbaliyan/config => ../.. diff --git a/k8s/go.sum b/k8s/example/go.sum similarity index 91% rename from k8s/go.sum rename to k8s/example/go.sum index fb2c7e1..d393a3e 100644 --- a/k8s/go.sum +++ b/k8s/example/go.sum @@ -68,8 +68,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rbaliyan/config v0.6.0 h1:jISVZcp0QciCoKj6/FPicznfKL+fFs0iisRU4Q14rNo= -github.com/rbaliyan/config v0.6.0/go.mod h1:Nax4I1rmQL6l7UTyUCkd6AquuYjv6BrI8L0uoGCTf7I= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -89,8 +87,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8= +go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -100,8 +98,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -110,28 +108,28 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= -golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/k8s/example/main/main.go b/k8s/example/main/main.go new file mode 100644 index 0000000..affb9f6 --- /dev/null +++ b/k8s/example/main/main.go @@ -0,0 +1,58 @@ +// Command kubeclient-demo wires the example Adapter into a [k8s.Store] and +// performs a single set/get round trip. It is intended as documentation in +// runnable form rather than a production starter. +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/rbaliyan/config" + _ "github.com/rbaliyan/config/codec/json" + "github.com/rbaliyan/config/k8s" + kubeclient "github.com/rbaliyan/config/k8s/example" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func main() { + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, _ := os.UserHomeDir() + kubeconfig = filepath.Join(home, ".kube", "config") + } + + cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Fatalf("load kubeconfig: %v", err) + } + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + log.Fatalf("build clientset: %v", err) + } + + store := k8s.NewStore(kubeclient.New(cs), k8s.WithK8sNamespace("default")) + ctx := context.Background() + if err := store.Connect(ctx); err != nil { + log.Fatalf("connect: %v", err) + } + defer func() { _ = store.Close(ctx) }() + + val := config.NewValue(map[string]any{"region": "us-east-1", "tier": "prod"}) + if _, err := store.Set(ctx, "demo", "app/settings", val); err != nil { + log.Fatalf("set: %v", err) + } + + got, err := store.Get(ctx, "demo", "app/settings") + if err != nil { + log.Fatalf("get: %v", err) + } + var out map[string]any + if err := got.Unmarshal(ctx, &out); err != nil { + log.Fatalf("unmarshal: %v", err) + } + fmt.Printf("settings: %+v\n", out) +} diff --git a/k8s/helpers_test.go b/k8s/helpers_test.go index 914abcb..efbc128 100644 --- a/k8s/helpers_test.go +++ b/k8s/helpers_test.go @@ -78,14 +78,20 @@ func TestParseResourceVersion(t *testing.T) { func TestStore_ConfigNamespace(t *testing.T) { s := &Store{} - if got := s.configNamespace("any", "config-prod"); got != "prod" { - t.Errorf("got %q", got) - } - if got := s.configNamespace("any", "config-secrets-prod"); got != "prod" { - t.Errorf("got %q", got) + cases := []struct { + ev Event + want string + }{ + {Event{New: &Resource{Name: "config-prod"}}, "prod"}, + {Event{New: &Resource{Name: "config-secrets-prod"}}, "prod"}, + {Event{Old: &Resource{Name: "config-prod"}}, "prod"}, + {Event{New: &Resource{Name: "unrelated-cm"}}, ""}, + {Event{}, ""}, } - if got := s.configNamespace("any", "unrelated-cm"); got != "" { - t.Errorf("unmanaged name should return empty, got %q", got) + for _, c := range cases { + if got := s.configNamespace(c.ev); got != c.want { + t.Errorf("configNamespace(%+v) = %q, want %q", c.ev, got, c.want) + } } } @@ -106,6 +112,16 @@ func TestStore_K8sNamespace(t *testing.T) { } } +func TestStore_ResourceFor(t *testing.T) { + s := &Store{opts: storeOptions{secretPrefix: "secret/"}} + if kind, name := s.resourceFor("prod", "db/host"); kind != KindConfigMap || name != "config-prod" { + t.Errorf("expected configmap, got %v %q", kind, name) + } + if kind, name := s.resourceFor("prod", "secret/db-password"); kind != KindSecret || name != "config-secrets-prod" { + t.Errorf("expected secret, got %v %q", kind, name) + } +} + func TestDefaultOptions(t *testing.T) { o := defaultOptions() if o.secretPrefix != defaultSecretPrefix { @@ -114,9 +130,6 @@ func TestDefaultOptions(t *testing.T) { if o.watchBufSize != defaultWatchBufSize { t.Errorf("watchBufSize: %d", o.watchBufSize) } - if o.resyncPeriod != defaultResyncPeriod { - t.Errorf("resyncPeriod: %v", o.resyncPeriod) - } } func TestOptions(t *testing.T) { @@ -129,10 +142,6 @@ func TestOptions(t *testing.T) { if o.secretPrefix != "sec/" { t.Error("secretPrefix not set") } - WithResyncPeriod(0)(&o) - if o.resyncPeriod != defaultResyncPeriod { - t.Error("zero resync should be ignored") - } WithWatchBufferSize(-1)(&o) if o.watchBufSize != defaultWatchBufSize { t.Error("negative bufSize should be ignored") @@ -142,3 +151,15 @@ func TestOptions(t *testing.T) { t.Error("bufSize not updated") } } + +func TestKindString(t *testing.T) { + if KindConfigMap.String() != "configmap" { + t.Errorf("KindConfigMap: %q", KindConfigMap.String()) + } + if KindSecret.String() != "secret" { + t.Errorf("KindSecret: %q", KindSecret.String()) + } + if Kind(99).String() != "unknown" { + t.Errorf("unknown kind: %q", Kind(99).String()) + } +} diff --git a/k8s/options.go b/k8s/options.go index d23ac9e..a1bd0b2 100644 --- a/k8s/options.go +++ b/k8s/options.go @@ -1,26 +1,19 @@ package k8s -import ( - "time" -) - const ( - defaultSecretPrefix = "secret/" - defaultResyncPeriod = 10 * time.Minute - defaultWatchBufSize = 100 + defaultSecretPrefix = "secret/" + defaultWatchBufSize = 100 ) type storeOptions struct { - k8sNamespace string // k8s namespace to watch; "" = all namespaces - secretPrefix string // config keys with this prefix → Secret; default "secret/" - resyncPeriod time.Duration // informer resync; default 10m - watchBufSize int // default 100 + k8sNamespace string // k8s namespace to scope to; "" = all namespaces + secretPrefix string // config keys with this prefix → Secret; default "secret/" + watchBufSize int // per-subscriber buffer for Store.Watch channels; default 100 } func defaultOptions() storeOptions { return storeOptions{ secretPrefix: defaultSecretPrefix, - resyncPeriod: defaultResyncPeriod, watchBufSize: defaultWatchBufSize, } } @@ -28,8 +21,12 @@ func defaultOptions() storeOptions { // Option configures the k8s store. type Option func(*storeOptions) -// WithK8sNamespace restricts the store to a single Kubernetes namespace. -// When empty (the default), the store watches all namespaces. +// WithK8sNamespace pins all reads and writes to a single Kubernetes namespace. +// +// When set, every config namespace maps to this Kubernetes namespace and +// Watch is scoped to it. When empty (the default), the config namespace name +// is used directly as the Kubernetes namespace; an empty config namespace +// falls back to the Kubernetes "default" namespace. func WithK8sNamespace(ns string) Option { return func(o *storeOptions) { o.k8sNamespace = ns @@ -38,6 +35,7 @@ func WithK8sNamespace(ns string) Option { // WithSecretKeyPrefix sets the config key prefix that routes to Kubernetes Secrets. // Keys with this prefix are stored in/read from Secrets; all others use ConfigMaps. +// Set to "" to disable secret routing entirely. // Default is "secret/". func WithSecretKeyPrefix(prefix string) Option { return func(o *storeOptions) { @@ -45,18 +43,8 @@ func WithSecretKeyPrefix(prefix string) Option { } } -// WithResyncPeriod sets the informer resync period. -// Default is 10 minutes. -func WithResyncPeriod(d time.Duration) Option { - return func(o *storeOptions) { - if d > 0 { - o.resyncPeriod = d - } - } -} - -// WithWatchBufferSize sets the buffer size for watch event channels. -// Default is 100. +// WithWatchBufferSize sets the per-subscriber buffer size for Store.Watch +// channels. Default is 100. func WithWatchBufferSize(n int) Option { return func(o *storeOptions) { if n > 0 { diff --git a/k8s/store.go b/k8s/store.go index 4567791..ebde4d5 100644 --- a/k8s/store.go +++ b/k8s/store.go @@ -1,22 +1,38 @@ // Package k8s provides a config.Store backed by Kubernetes ConfigMaps and Secrets. // -// Config namespaces map to Kubernetes namespaces (or to a fixed k8s namespace -// when WithK8sNamespace is set). Each config namespace is backed by one ConfigMap -// named "config-{namespace}" and, for secret-prefixed keys, one Secret named -// "config-secrets-{namespace}". +// Each config namespace is backed by one ConfigMap named "config-{namespace}" and, +// for keys matching the configured secret prefix, one Secret named +// "config-secrets-{namespace}". Config keys map to Kubernetes data keys by +// replacing "/" with "."; the codec used for each value is recorded in a +// per-resource annotation so reads can decode it. // -// Key mapping: config key "/" is replaced by "." to form a valid Kubernetes data key. -// Keys prefixed with opts.secretPrefix (default "secret/") are stored in Secrets; -// all other keys are stored in ConfigMaps. +// # Decoupled from kubernetes.io // -// Watch support is implemented via k8s.io/client-go informers. The informer cache -// is populated on Connect and kept in sync automatically; Get/Find read from the -// local cache without making API calls. +// This package does NOT import any k8s.io/* packages. Instead it depends on a +// narrow [Client] interface that the caller must implement, typically over the +// real Kubernetes Go client. See the k8s/example sub-module for a reference +// adapter built on top of kubernetes.Interface. +// +// # Operational model +// +// Like the postgres and mongodb stores, the k8s store does not maintain a local +// cache of resources. Reads (Get, Find) call straight through to the Client and +// rely on the Manager-level cache for repeat lookups. Watch is fed by +// Client.Watch, which the Store fans out to per-subscriber channels. +// +// # Connect / Close +// +// Connect starts a background goroutine that consumes events from Client.Watch +// and dispatches them to active watchers. The Store does not wait for any +// initial sync — the first Get after Connect performs an API round-trip via +// the Client. Close stops the watch goroutine and closes all subscriber +// channels. package k8s import ( + "bytes" "context" - "fmt" + "errors" "sort" "strconv" "strings" @@ -25,12 +41,6 @@ import ( "time" "github.com/rbaliyan/config" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" ) // configMapPrefix is the prefix for ConfigMap names managed by this store. @@ -39,16 +49,16 @@ const configMapPrefix = "config-" // secretNamePrefix is the prefix for Secret names managed by this store. const secretNamePrefix = "config-secrets-" -// codecAnnotationPrefix is used to round-trip codec names across writes/reads. -// Each k8sKey stored in a ConfigMap/Secret has a matching annotation -// "config.rbaliyan.dev/codec-" with its codec name. +// codecAnnotationPrefix prefixes the per-key codec annotation keys. +// Each data key stored in a ConfigMap/Secret has a matching annotation +// "config.rbaliyan.dev/codec-" naming the codec used to encode it. const codecAnnotationPrefix = "config.rbaliyan.dev/codec-" func codecAnnotationKey(k8sKey string) string { return codecAnnotationPrefix + k8sKey } -// resolveCodec returns the stored codec name for a given k8sKey or "json" as fallback. +// resolveCodec returns the stored codec name for a given k8sKey, or "json" as fallback. func resolveCodec(anns map[string]string, k8sKey string) string { if anns != nil { if c := anns[codecAnnotationKey(k8sKey)]; c != "" { @@ -58,14 +68,18 @@ func resolveCodec(anns map[string]string, k8sKey string) string { return "json" } -// Store implements config.Store backed by Kubernetes ConfigMaps and Secrets. +// Store implements config.Store backed by Kubernetes ConfigMaps and Secrets via +// a [Client] adapter. type Store struct { - client kubernetes.Interface - factory informers.SharedInformerFactory - opts storeOptions + client Client + opts storeOptions + + connected atomic.Bool + closed atomic.Bool - closed atomic.Bool - stopChan chan struct{} + watchCtx context.Context + watchCancel context.CancelFunc + watchDone chan struct{} watchMu sync.RWMutex watchers map[*watchEntry]struct{} @@ -76,9 +90,9 @@ var ( _ config.HealthChecker = (*Store)(nil) ) -// NewStore creates a new k8s-backed config store. +// NewStore creates a new k8s-backed config store using the given Client. // Call Connect before using the store. -func NewStore(client kubernetes.Interface, opts ...Option) *Store { +func NewStore(client Client, opts ...Option) *Store { o := defaultOptions() for _, opt := range opts { opt(&o) @@ -86,7 +100,6 @@ func NewStore(client kubernetes.Interface, opts ...Option) *Store { return &Store{ client: client, opts: o, - stopChan: make(chan struct{}), watchers: make(map[*watchEntry]struct{}), } } @@ -94,69 +107,42 @@ func NewStore(client kubernetes.Interface, opts ...Option) *Store { // BackendName returns the stable backend identifier used in error messages. func (s *Store) BackendName() string { return "k8s" } -// Connect starts the informer factory and waits for the initial cache sync. -// The context deadline/timeout controls how long to wait for cache sync. +// Connect starts the background watch loop. The first read after Connect +// hits the Kubernetes API via the Client; there is no initial cache sync. +// The watch lifetime is bounded by Close, not by ctx — ctx is only used for +// validation prior to starting the watch. func (s *Store) Connect(ctx context.Context) error { if s.closed.Load() { return config.ErrStoreClosed } - - var factoryOpts []informers.SharedInformerOption - if s.opts.k8sNamespace != "" { - factoryOpts = append(factoryOpts, informers.WithNamespace(s.opts.k8sNamespace)) - } - - s.factory = informers.NewSharedInformerFactoryWithOptions( - s.client, - s.opts.resyncPeriod, - factoryOpts..., - ) - - cmInformer := s.factory.Core().V1().ConfigMaps().Informer() - secInformer := s.factory.Core().V1().Secrets().Informer() - - if _, err := cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { s.onConfigMapAdd(obj) }, - UpdateFunc: func(old, newObj any) { s.onConfigMapUpdate(old, newObj) }, - DeleteFunc: func(obj any) { s.onConfigMapDelete(obj) }, - }); err != nil { - return config.WrapStoreError("connect", "k8s", "configmap-handler", err) + if !s.connected.CompareAndSwap(false, true) { + return nil // already connected } - if _, err := secInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { s.onSecretAdd(obj) }, - UpdateFunc: func(old, newObj any) { s.onSecretUpdate(old, newObj) }, - DeleteFunc: func(obj any) { s.onSecretDelete(obj) }, - }); err != nil { - return config.WrapStoreError("connect", "k8s", "secret-handler", err) + events, err := s.client.Watch(context.Background(), s.opts.k8sNamespace) + if err != nil { + s.connected.Store(false) + return config.WrapStoreError("connect", "k8s", "watch", err) } - s.factory.Start(s.stopChan) + s.watchCtx, s.watchCancel = context.WithCancel(context.Background()) + s.watchDone = make(chan struct{}) - if !cache.WaitForCacheSync(ctx.Done(), cmInformer.HasSynced, secInformer.HasSynced) { - err := ctx.Err() - if err == nil { - err = fmt.Errorf("cache sync timed out") - } - return config.WrapStoreError("connect", "k8s", "cache-sync", err) - } + go s.runWatch(events) return nil } -// Close stops the informer factory and closes all watch channels. -// Close blocks until informer goroutines have drained so callers can -// safely release any resources they injected into the client. +// Close stops the watch goroutine and closes all subscriber channels. +// Close blocks until the watch goroutine has exited. func (s *Store) Close(_ context.Context) error { if s.closed.Swap(true) { return nil // already closed } - - close(s.stopChan) - - // Shutdown blocks until all informers have fully stopped. It is safe - // to call even if Start was never invoked. - if s.factory != nil { - s.factory.Shutdown() + if s.watchCancel != nil { + s.watchCancel() + } + if s.watchDone != nil { + <-s.watchDone } s.watchMu.Lock() @@ -181,12 +167,11 @@ func (s *Store) Close(_ context.Context) error { } // Get retrieves a configuration value by namespace and key. -// Reads from the informer cache (no API call). func (s *Store) Get(ctx context.Context, namespace, key string) (config.Value, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -198,47 +183,28 @@ func (s *Store) Get(ctx context.Context, namespace, key string) (config.Value, e k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - secName := secretResourceName(namespace) - lister := s.factory.Core().V1().Secrets().Lister() - secret, err := lister.Secrets(k8sNS).Get(secName) - if err != nil { - if apierrors.IsNotFound(err) { - return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - return nil, config.WrapStoreError("get", "k8s", key, err) - } - rawBytes, ok := secret.Data[k8sKey] - if !ok { - return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - return makeValueFromBytes(rawBytes, namespace, key, secret.ResourceVersion, resolveCodec(secret.Annotations, k8sKey)) - } - - cmName := configMapResourceName(namespace) - lister := s.factory.Core().V1().ConfigMaps().Lister() - cm, err := lister.ConfigMaps(k8sNS).Get(cmName) + r, err := s.client.Get(ctx, kind, k8sNS, name) if err != nil { - if apierrors.IsNotFound(err) { + if errors.Is(err, ErrNotFound) { return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} } return nil, config.WrapStoreError("get", "k8s", key, err) } - rawStr, ok := cm.Data[k8sKey] + raw, ok := r.Data[k8sKey] if !ok { return nil, &config.KeyNotFoundError{Key: key, Namespace: namespace} } - return makeValueFromString(rawStr, namespace, key, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) + return makeValueFromBytes(ctx, raw, namespace, key, r.ResourceVersion, resolveCodec(r.Annotations, k8sKey)) } // Set creates or updates a configuration value. -// Writes directly to the Kubernetes API (not via informer cache). func (s *Store) Set(ctx context.Context, namespace, key string, value config.Value) (config.Value, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -256,89 +222,19 @@ func (s *Store) Set(ctx context.Context, namespace, key string, value config.Val k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) codecName := value.Codec() + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - return s.setSecret(ctx, namespace, key, k8sNS, k8sKey, codecName, data) - } - return s.setConfigMap(ctx, namespace, key, k8sNS, k8sKey, codecName, string(data)) -} - -func (s *Store) setConfigMap(ctx context.Context, namespace, key, k8sNS, k8sKey, codecName, strVal string) (config.Value, error) { - cmName := configMapResourceName(namespace) - cm, err := s.client.CoreV1().ConfigMaps(k8sNS).Get(ctx, cmName, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { + existing, err := s.client.Get(ctx, kind, k8sNS, name) + if err != nil && !errors.Is(err, ErrNotFound) { return nil, config.WrapStoreError("set", "k8s", key, err) } - if err != nil { - // Create a new ConfigMap. - cm = &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: cmName, - Namespace: k8sNS, - Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, - }, - Data: map[string]string{k8sKey: strVal}, - } - created, createErr := s.client.CoreV1().ConfigMaps(k8sNS).Create(ctx, cm, metav1.CreateOptions{}) - if createErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, createErr) - } - return makeValueFromString(strVal, namespace, key, created.ResourceVersion, codecName) - } - - if cm.Data == nil { - cm.Data = make(map[string]string) - } - if cm.Annotations == nil { - cm.Annotations = make(map[string]string) - } - cm.Data[k8sKey] = strVal - cm.Annotations[codecAnnotationKey(k8sKey)] = codecName - updated, updateErr := s.client.CoreV1().ConfigMaps(k8sNS).Update(ctx, cm, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, updateErr) - } - return makeValueFromString(strVal, namespace, key, updated.ResourceVersion, codecName) -} - -func (s *Store) setSecret(ctx context.Context, namespace, key, k8sNS, k8sKey, codecName string, rawBytes []byte) (config.Value, error) { - secName := secretResourceName(namespace) - sec, err := s.client.CoreV1().Secrets(k8sNS).Get(ctx, secName, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return nil, config.WrapStoreError("set", "k8s", key, err) - } + r := mergeResource(existing, name, k8sKey, data, codecName) + updated, err := s.client.Upsert(ctx, kind, k8sNS, r) if err != nil { - // Create a new Secret. - sec = &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: secName, - Namespace: k8sNS, - Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, - }, - Data: map[string][]byte{k8sKey: rawBytes}, - } - created, createErr := s.client.CoreV1().Secrets(k8sNS).Create(ctx, sec, metav1.CreateOptions{}) - if createErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, createErr) - } - return makeValueFromBytes(rawBytes, namespace, key, created.ResourceVersion, codecName) - } - - if sec.Data == nil { - sec.Data = make(map[string][]byte) - } - if sec.Annotations == nil { - sec.Annotations = make(map[string]string) - } - sec.Data[k8sKey] = rawBytes - sec.Annotations[codecAnnotationKey(k8sKey)] = codecName - - updated, updateErr := s.client.CoreV1().Secrets(k8sNS).Update(ctx, sec, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, config.WrapStoreError("set", "k8s", key, updateErr) + return nil, config.WrapStoreError("set", "k8s", key, err) } - return makeValueFromBytes(rawBytes, namespace, key, updated.ResourceVersion, codecName) + return makeValueFromBytes(ctx, data, namespace, key, updated.ResourceVersion, codecName) } // Delete removes a configuration entry. @@ -346,7 +242,7 @@ func (s *Store) Delete(ctx context.Context, namespace, key string) error { if s.closed.Load() { return config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -358,62 +254,40 @@ func (s *Store) Delete(ctx context.Context, namespace, key string) error { k8sNS := s.k8sNamespace(namespace) k8sKey := configKeyToK8sKey(key) + kind, name := s.resourceFor(namespace, key) - if isSecretKey(key, s.opts.secretPrefix) { - return s.deleteFromSecret(ctx, namespace, key, k8sNS, k8sKey) - } - return s.deleteFromConfigMap(ctx, namespace, key, k8sNS, k8sKey) -} - -func (s *Store) deleteFromConfigMap(ctx context.Context, namespace, key, k8sNS, k8sKey string) error { - cmName := configMapResourceName(namespace) - cm, err := s.client.CoreV1().ConfigMaps(k8sNS).Get(ctx, cmName, metav1.GetOptions{}) + existing, err := s.client.Get(ctx, kind, k8sNS, name) if err != nil { - if apierrors.IsNotFound(err) { + if errors.Is(err, ErrNotFound) { return &config.KeyNotFoundError{Key: key, Namespace: namespace} } return config.WrapStoreError("delete", "k8s", key, err) } - if _, ok := cm.Data[k8sKey]; !ok { + if _, ok := existing.Data[k8sKey]; !ok { return &config.KeyNotFoundError{Key: key, Namespace: namespace} } - delete(cm.Data, k8sKey) - delete(cm.Annotations, codecAnnotationKey(k8sKey)) - _, updateErr := s.client.CoreV1().ConfigMaps(k8sNS).Update(ctx, cm, metav1.UpdateOptions{}) - if updateErr != nil { - return config.WrapStoreError("delete", "k8s", key, updateErr) + delete(existing.Data, k8sKey) + if existing.Annotations != nil { + delete(existing.Annotations, codecAnnotationKey(k8sKey)) } - return nil -} - -func (s *Store) deleteFromSecret(ctx context.Context, namespace, key, k8sNS, k8sKey string) error { - secName := secretResourceName(namespace) - sec, err := s.client.CoreV1().Secrets(k8sNS).Get(ctx, secName, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - return &config.KeyNotFoundError{Key: key, Namespace: namespace} - } + if _, err := s.client.Upsert(ctx, kind, k8sNS, existing); err != nil { return config.WrapStoreError("delete", "k8s", key, err) } - if _, ok := sec.Data[k8sKey]; !ok { - return &config.KeyNotFoundError{Key: key, Namespace: namespace} - } - delete(sec.Data, k8sKey) - delete(sec.Annotations, codecAnnotationKey(k8sKey)) - _, updateErr := s.client.CoreV1().Secrets(k8sNS).Update(ctx, sec, metav1.UpdateOptions{}) - if updateErr != nil { - return config.WrapStoreError("delete", "k8s", key, updateErr) - } return nil } // Find returns a page of configuration entries matching the filter. -// Reads from the informer cache (no API call). +// +// In Keys mode (filter.Keys() non-empty), each key is resolved individually +// and secret-prefixed keys are read from the namespace Secret as expected. +// In Prefix/cursor mode, only the namespace ConfigMap is scanned; Secret +// data keys are not enumerated. Use Keys mode to read secret-prefixed keys +// in bulk. func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter) (config.Page, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return nil, config.ErrStoreNotConnected } if err := config.ValidateNamespace(namespace); err != nil { @@ -423,10 +297,7 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter filter = config.NewFilter().Build() } - k8sNS := s.k8sNamespace(namespace) - cmName := configMapResourceName(namespace) - - // Keys mode: exact key lookup. + // Keys mode: per-key Get (handles secret routing automatically). if keys := filter.Keys(); len(keys) > 0 { results := make(map[string]config.Value) for _, key := range keys { @@ -438,34 +309,35 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter return config.NewPage(results, "", 0), nil } - // Prefix mode: scan ConfigMap data. prefix := filter.Prefix() cursor := filter.Cursor() limit := filter.Limit() - lister := s.factory.Core().V1().ConfigMaps().Lister() - cm, err := lister.ConfigMaps(k8sNS).Get(cmName) + k8sNS := s.k8sNamespace(namespace) + cmName := configMapResourceName(namespace) + + cm, err := s.client.Get(ctx, KindConfigMap, k8sNS, cmName) if err != nil { - // No ConfigMap yet — return empty page. - return config.NewPage(make(map[string]config.Value), "", limit), nil + if errors.Is(err, ErrNotFound) { + return config.NewPage(make(map[string]config.Value), "", limit), nil + } + return nil, config.WrapStoreError("find", "k8s", "", err) } - // Collect and sort config keys for consistent pagination. type kv struct { configKey string - rawVal string + raw []byte } var matches []kv - for k8sKey, rawVal := range cm.Data { + for k8sKey, raw := range cm.Data { configKey := k8sKeyToConfigKey(k8sKey) if prefix != "" && !strings.HasPrefix(configKey, prefix) { continue } - // Cursor is the last config key from the previous page. if cursor != "" && configKey <= cursor { continue } - matches = append(matches, kv{configKey: configKey, rawVal: rawVal}) + matches = append(matches, kv{configKey: configKey, raw: raw}) } sort.Slice(matches, func(i, j int) bool { @@ -480,7 +352,7 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter var lastKey string for _, m := range matches { k8sKey := configKeyToK8sKey(m.configKey) - v, valErr := makeValueFromString(m.rawVal, namespace, m.configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) + v, valErr := makeValueFromBytes(ctx, m.raw, namespace, m.configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) if valErr == nil { results[m.configKey] = v lastKey = m.configKey @@ -491,10 +363,16 @@ func (s *Store) Find(ctx context.Context, namespace string, filter config.Filter } // Watch returns a channel that receives change events for matching keys. +// Returns ErrStoreNotConnected if called before Connect. Events are dropped +// when the per-subscriber buffer is full (size set via WithWatchBufferSize). +// The channel is closed when ctx is cancelled or the Store is closed. func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan config.ChangeEvent, error) { if s.closed.Load() { return nil, config.ErrStoreClosed } + if !s.connected.Load() { + return nil, config.ErrStoreNotConnected + } ctx, cancel := context.WithCancel(ctx) ch := make(chan config.ChangeEvent, s.opts.watchBufSize) @@ -513,7 +391,7 @@ func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan co go func() { select { case <-ctx.Done(): - case <-s.stopChan: + case <-s.watchCtx.Done(): } s.watchMu.Lock() @@ -531,220 +409,105 @@ func (s *Store) Watch(ctx context.Context, filter config.WatchFilter) (<-chan co return ch, nil } -// Health performs a basic health check by listing namespaces. +// Health performs a health check by delegating to the Client. func (s *Store) Health(ctx context.Context) error { if s.closed.Load() { return config.ErrStoreClosed } - if s.factory == nil { + if !s.connected.Load() { return config.ErrStoreNotConnected } - _, err := s.client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{Limit: 1}) - if err != nil { + if err := s.client.Health(ctx); err != nil { return config.WrapStoreError("health", "k8s", "", err) } return nil } -// --------------------------------------------------------------------------- -// Informer event handlers -// --------------------------------------------------------------------------- - -func (s *Store) onConfigMapAdd(obj any) { - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - return - } - ns := s.configNamespace(cm.Namespace, cm.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey, rawVal := range cm.Data { - configKey := k8sKeyToConfigKey(k8sKey) - v, err := makeValueFromString(rawVal, ns, configKey, cm.ResourceVersion, resolveCodec(cm.Annotations, k8sKey)) - if err != nil { - continue +// runWatch consumes events from the Client and dispatches per-key change events +// to subscribers. It exits when watchCtx is cancelled or events is closed. +func (s *Store) runWatch(events <-chan Event) { + defer close(s.watchDone) + for { + select { + case <-s.watchCtx.Done(): + return + case ev, ok := <-events: + if !ok { + return + } + s.dispatchEvent(ev) } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) } } -func (s *Store) onConfigMapUpdate(old, newObj any) { - oldCM, ok1 := old.(*corev1.ConfigMap) - newCM, ok2 := newObj.(*corev1.ConfigMap) - if !ok1 || !ok2 { - return - } - ns := s.configNamespace(newCM.Namespace, newCM.Name) - if ns == "" { - return - } +// dispatchEvent translates a single resource-level Event into per-key +// config.ChangeEvents and delivers them to matching watchers. +func (s *Store) dispatchEvent(ev Event) { now := time.Now().UTC() - - // Keys added or updated. - for k8sKey, newVal := range newCM.Data { - configKey := k8sKeyToConfigKey(k8sKey) - oldVal, existed := oldCM.Data[k8sKey] - if !existed || oldVal != newVal { - v, err := makeValueFromString(newVal, ns, configKey, newCM.ResourceVersion, resolveCodec(newCM.Annotations, k8sKey)) - if err != nil { - continue - } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) - } + ns := s.configNamespace(ev) + if ns == "" { + return // not a resource we manage } - // Keys deleted. - for k8sKey := range oldCM.Data { - if _, ok := newCM.Data[k8sKey]; !ok { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } - } -} - -func (s *Store) onConfigMapDelete(obj any) { - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - // Handle DeletedFinalStateUnknown tombstone. - tombstone, isTombstone := obj.(cache.DeletedFinalStateUnknown) - if !isTombstone { + switch ev.Type { + case EventAdd: + if ev.New == nil { return } - cm, ok = tombstone.Obj.(*corev1.ConfigMap) - if !ok { + for k8sKey, raw := range ev.New.Data { + s.emitSet(ns, ev.New, k8sKey, raw, now) + } + case EventUpdate: + if ev.New == nil { return } - } - ns := s.configNamespace(cm.Namespace, cm.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey := range cm.Data { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } -} - -func (s *Store) onSecretAdd(obj any) { - sec, ok := obj.(*corev1.Secret) - if !ok { - return - } - ns := s.configNamespace(sec.Namespace, sec.Name) - if ns == "" { - return - } - now := time.Now().UTC() - for k8sKey, rawBytes := range sec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - v, err := makeValueFromBytes(rawBytes, ns, configKey, sec.ResourceVersion, resolveCodec(sec.Annotations, k8sKey)) - if err != nil { - continue + var oldData map[string][]byte + if ev.Old != nil { + oldData = ev.Old.Data } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) - } -} - -func (s *Store) onSecretUpdate(old, newObj any) { - oldSec, ok1 := old.(*corev1.Secret) - newSec, ok2 := newObj.(*corev1.Secret) - if !ok1 || !ok2 { - return - } - ns := s.configNamespace(newSec.Namespace, newSec.Name) - if ns == "" { - return - } - now := time.Now().UTC() - - for k8sKey, newBytes := range newSec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - oldBytes, existed := oldSec.Data[k8sKey] - if !existed || string(oldBytes) != string(newBytes) { - v, err := makeValueFromBytes(newBytes, ns, configKey, newSec.ResourceVersion, resolveCodec(newSec.Annotations, k8sKey)) - if err != nil { - continue + for k8sKey, newRaw := range ev.New.Data { + oldRaw, existed := oldData[k8sKey] + if !existed || !bytes.Equal(oldRaw, newRaw) { + s.emitSet(ns, ev.New, k8sKey, newRaw, now) } - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeSet, - Namespace: ns, - Key: configKey, - Value: v, - Timestamp: now, - }) } - } - - for k8sKey := range oldSec.Data { - if _, ok := newSec.Data[k8sKey]; !ok { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) + for k8sKey := range oldData { + if _, ok := ev.New.Data[k8sKey]; !ok { + s.emitDelete(ns, k8sKey, now) + } } - } -} - -func (s *Store) onSecretDelete(obj any) { - sec, ok := obj.(*corev1.Secret) - if !ok { - tombstone, isTombstone := obj.(cache.DeletedFinalStateUnknown) - if !isTombstone { + case EventDelete: + if ev.Old == nil { return } - sec, ok = tombstone.Obj.(*corev1.Secret) - if !ok { - return + for k8sKey := range ev.Old.Data { + s.emitDelete(ns, k8sKey, now) } } - ns := s.configNamespace(sec.Namespace, sec.Name) - if ns == "" { +} + +func (s *Store) emitSet(ns string, r *Resource, k8sKey string, raw []byte, ts time.Time) { + configKey := k8sKeyToConfigKey(k8sKey) + v, err := makeValueFromBytes(context.Background(), raw, ns, configKey, r.ResourceVersion, resolveCodec(r.Annotations, k8sKey)) + if err != nil { return } - now := time.Now().UTC() - for k8sKey := range sec.Data { - configKey := k8sKeyToConfigKey(k8sKey) - s.notifyWatchers(config.ChangeEvent{ - Type: config.ChangeTypeDelete, - Namespace: ns, - Key: configKey, - Timestamp: now, - }) - } + s.notifyWatchers(config.ChangeEvent{ + Type: config.ChangeTypeSet, + Namespace: ns, + Key: configKey, + Value: v, + Timestamp: ts, + }) +} + +func (s *Store) emitDelete(ns, k8sKey string, ts time.Time) { + s.notifyWatchers(config.ChangeEvent{ + Type: config.ChangeTypeDelete, + Namespace: ns, + Key: k8sKeyToConfigKey(k8sKey), + Timestamp: ts, + }) } // --------------------------------------------------------------------------- @@ -752,8 +515,6 @@ func (s *Store) onSecretDelete(obj any) { // --------------------------------------------------------------------------- // k8sNamespace returns the Kubernetes namespace to use for a given config namespace. -// If opts.k8sNamespace is set, all config namespaces map to it. -// Otherwise the config namespace name is used directly. func (s *Store) k8sNamespace(configNS string) string { if s.opts.k8sNamespace != "" { return s.opts.k8sNamespace @@ -764,14 +525,31 @@ func (s *Store) k8sNamespace(configNS string) string { return configNS } -// configNamespace reverses the k8s resource name back to the config namespace. -// Returns "" if the resource is not managed by this store. -func (s *Store) configNamespace(k8sNS, resourceName string) string { - if strings.HasPrefix(resourceName, secretNamePrefix) { - return strings.TrimPrefix(resourceName, secretNamePrefix) +// resourceFor returns the kind and resource name backing the given config key. +func (s *Store) resourceFor(namespace, key string) (Kind, string) { + if isSecretKey(key, s.opts.secretPrefix) { + return KindSecret, secretResourceName(namespace) } - if strings.HasPrefix(resourceName, configMapPrefix) { - return strings.TrimPrefix(resourceName, configMapPrefix) + return KindConfigMap, configMapResourceName(namespace) +} + +// configNamespace recovers the config namespace from a watched event. +// Returns "" if the resource is not managed by this store. +func (s *Store) configNamespace(ev Event) string { + var name string + switch { + case ev.New != nil: + name = ev.New.Name + case ev.Old != nil: + name = ev.Old.Name + default: + return "" + } + if rest, ok := strings.CutPrefix(name, secretNamePrefix); ok { + return rest + } + if rest, ok := strings.CutPrefix(name, configMapPrefix); ok { + return rest } return "" } @@ -802,34 +580,37 @@ func isSecretKey(key, prefix string) bool { return prefix != "" && strings.HasPrefix(key, prefix) } -// makeValueFromString creates a config.Value from a ConfigMap string value. -// codecName should be the value's codec as recorded in the ConfigMap annotations; -// fall back to "json" if unknown. -func makeValueFromString(rawStr, namespace, key, resourceVersion, codecName string) (config.Value, error) { - if codecName == "" { - codecName = "json" +// mergeResource constructs a Resource for an Upsert: clones existing or builds +// a new one with the given data key and codec annotation set. +func mergeResource(existing *Resource, name, k8sKey string, raw []byte, codecName string) *Resource { + if existing == nil { + return &Resource{ + Name: name, + Annotations: map[string]string{codecAnnotationKey(k8sKey): codecName}, + Data: map[string][]byte{k8sKey: raw}, + } } - version := parseResourceVersion(resourceVersion) - entryID := namespace + "/" + key - return config.NewValueFromBytes( - context.Background(), - []byte(rawStr), - codecName, - config.WithValueMetadata(version, time.Time{}, time.Time{}), - config.WithValueEntryID(entryID), - ) + if existing.Data == nil { + existing.Data = make(map[string][]byte) + } + if existing.Annotations == nil { + existing.Annotations = make(map[string]string) + } + existing.Data[k8sKey] = raw + existing.Annotations[codecAnnotationKey(k8sKey)] = codecName + return existing } -// makeValueFromBytes creates a config.Value from a Secret byte value. -func makeValueFromBytes(rawBytes []byte, namespace, key, resourceVersion, codecName string) (config.Value, error) { +// makeValueFromBytes builds a config.Value from raw bytes plus k8s metadata. +func makeValueFromBytes(ctx context.Context, raw []byte, namespace, key, resourceVersion, codecName string) (config.Value, error) { if codecName == "" { codecName = "json" } version := parseResourceVersion(resourceVersion) entryID := namespace + "/" + key return config.NewValueFromBytes( - context.Background(), - rawBytes, + ctx, + raw, codecName, config.WithValueMetadata(version, time.Time{}, time.Time{}), config.WithValueEntryID(entryID), diff --git a/k8s/store_test.go b/k8s/store_test.go new file mode 100644 index 0000000..711eeea --- /dev/null +++ b/k8s/store_test.go @@ -0,0 +1,310 @@ +package k8s + +import ( + "context" + "errors" + "maps" + "sync" + "testing" + "time" + + "github.com/rbaliyan/config" + _ "github.com/rbaliyan/config/codec/json" +) + +// fakeClient is an in-memory Client used by tests. It mimics enough of +// kubernetes' ConfigMap/Secret semantics to exercise Store end-to-end. +type fakeClient struct { + mu sync.Mutex + cms map[string]map[string]*Resource // namespace -> name -> resource + secrets map[string]map[string]*Resource + rv int + events chan Event + healthOK bool +} + +func newFakeClient() *fakeClient { + return &fakeClient{ + cms: map[string]map[string]*Resource{}, + secrets: map[string]map[string]*Resource{}, + events: make(chan Event, 32), + healthOK: true, + } +} + +func (f *fakeClient) bucket(kind Kind) map[string]map[string]*Resource { + if kind == KindSecret { + return f.secrets + } + return f.cms +} + +func (f *fakeClient) Get(_ context.Context, kind Kind, namespace, name string) (*Resource, error) { + f.mu.Lock() + defer f.mu.Unlock() + ns := f.bucket(kind)[namespace] + if ns == nil { + return nil, ErrNotFound + } + r := ns[name] + if r == nil { + return nil, ErrNotFound + } + return cloneResource(r), nil +} + +func (f *fakeClient) Upsert(_ context.Context, kind Kind, namespace string, r *Resource) (*Resource, error) { + f.mu.Lock() + defer f.mu.Unlock() + bucket := f.bucket(kind) + if bucket[namespace] == nil { + bucket[namespace] = map[string]*Resource{} + } + old := bucket[namespace][r.Name] + f.rv++ + stored := cloneResource(r) + stored.ResourceVersion = itoa(f.rv) + bucket[namespace][r.Name] = stored + + ev := Event{Kind: kind, Namespace: namespace, New: cloneResource(stored)} + if old == nil { + ev.Type = EventAdd + } else { + ev.Type = EventUpdate + ev.Old = cloneResource(old) + } + select { + case f.events <- ev: + default: + } + return cloneResource(stored), nil +} + +func (f *fakeClient) Watch(ctx context.Context, _ string) (<-chan Event, error) { + out := make(chan Event, 32) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-f.events: + if !ok { + return + } + select { + case out <- ev: + case <-ctx.Done(): + return + } + } + } + }() + return out, nil +} + +func (f *fakeClient) Health(_ context.Context) error { + if !f.healthOK { + return errors.New("unhealthy") + } + return nil +} + +func cloneResource(r *Resource) *Resource { + if r == nil { + return nil + } + out := &Resource{ + Name: r.Name, + ResourceVersion: r.ResourceVersion, + } + if r.Annotations != nil { + out.Annotations = make(map[string]string, len(r.Annotations)) + maps.Copy(out.Annotations, r.Annotations) + } + if r.Data != nil { + out.Data = make(map[string][]byte, len(r.Data)) + for k, v := range r.Data { + b := make([]byte, len(v)) + copy(b, v) + out.Data[k] = b + } + } + return out +} + +func itoa(n int) string { + const digits = "0123456789" + if n == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = digits[n%10] + n /= 10 + } + return string(buf[i:]) +} + +func mustConnect(t *testing.T, s *Store) { + t.Helper() + if err := s.Connect(context.Background()); err != nil { + t.Fatalf("Connect: %v", err) + } + t.Cleanup(func() { _ = s.Close(context.Background()) }) +} + +func TestStore_SetGetDelete_ConfigMap(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + val := config.NewValue(map[string]any{"host": "localhost", "port": 5432}) + stored, err := s.Set(ctx, "prod", "db/conn", val) + if err != nil { + t.Fatalf("Set: %v", err) + } + if stored.Metadata().Version() == 0 { + t.Errorf("expected non-zero version, got 0") + } + + got, err := s.Get(ctx, "prod", "db/conn") + if err != nil { + t.Fatalf("Get: %v", err) + } + var out map[string]any + if err := got.Unmarshal(ctx, &out); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if out["host"] != "localhost" { + t.Errorf("expected localhost, got %v", out["host"]) + } + + if err := s.Delete(ctx, "prod", "db/conn"); err != nil { + t.Fatalf("Delete: %v", err) + } + if _, err := s.Get(ctx, "prod", "db/conn"); !config.IsNotFound(err) { + t.Errorf("expected not found, got %v", err) + } +} + +func TestStore_SecretRouting(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + val := config.NewValue("super-secret") + if _, err := s.Set(ctx, "prod", "secret/api-key", val); err != nil { + t.Fatalf("Set: %v", err) + } + if _, ok := fc.secrets["prod"]["config-secrets-prod"]; !ok { + t.Errorf("expected secret resource to exist; secrets=%+v", fc.secrets) + } + if _, ok := fc.cms["prod"]["config-prod"]; ok { + t.Errorf("did not expect configmap for secret-prefixed key") + } +} + +func TestStore_GetMissing(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + if _, err := s.Get(ctx, "prod", "missing/key"); !config.IsNotFound(err) { + t.Errorf("expected not-found, got %v", err) + } +} + +func TestStore_Find_PrefixAndCursor(t *testing.T) { + ctx := context.Background() + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + for _, k := range []string{"app/a", "app/b", "app/c", "other/x"} { + if _, err := s.Set(ctx, "prod", k, config.NewValue(k)); err != nil { + t.Fatalf("Set %s: %v", k, err) + } + } + + page, err := s.Find(ctx, "prod", config.NewFilter().WithPrefix("app/").WithLimit(2).Build()) + if err != nil { + t.Fatalf("Find: %v", err) + } + if len(page.Results()) != 2 { + t.Errorf("expected 2 results, got %d", len(page.Results())) + } + if page.NextCursor() != "app/b" { + t.Errorf("expected cursor app/b, got %q", page.NextCursor()) + } + + next, err := s.Find(ctx, "prod", config.NewFilter().WithPrefix("app/").WithLimit(2).WithCursor(page.NextCursor()).Build()) + if err != nil { + t.Fatalf("Find page2: %v", err) + } + if _, ok := next.Results()["app/c"]; !ok { + t.Errorf("expected app/c in second page; got %+v", next.Results()) + } +} + +func TestStore_Watch_DeliversChanges(t *testing.T) { + ctx := t.Context() + + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + + ch, err := s.Watch(ctx, config.WatchFilter{Namespaces: []string{"prod"}}) + if err != nil { + t.Fatalf("Watch: %v", err) + } + + if _, err := s.Set(ctx, "prod", "feature/flag", config.NewValue(true)); err != nil { + t.Fatalf("Set: %v", err) + } + + select { + case ev := <-ch: + if ev.Type != config.ChangeTypeSet || ev.Key != "feature/flag" || ev.Namespace != "prod" { + t.Errorf("unexpected event: %+v", ev) + } + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for watch event") + } +} + +func TestStore_Health(t *testing.T) { + fc := newFakeClient() + s := NewStore(fc) + mustConnect(t, s) + if err := s.Health(context.Background()); err != nil { + t.Errorf("Health: %v", err) + } + fc.healthOK = false + if err := s.Health(context.Background()); err == nil { + t.Errorf("expected unhealthy error") + } +} + +func TestStore_NotConnected(t *testing.T) { + s := NewStore(newFakeClient()) + if _, err := s.Get(context.Background(), "prod", "k"); !errors.Is(err, config.ErrStoreNotConnected) { + t.Errorf("expected ErrStoreNotConnected, got %v", err) + } +} + +func TestStore_AfterClose(t *testing.T) { + s := NewStore(newFakeClient()) + mustConnect(t, s) + if err := s.Close(context.Background()); err != nil { + t.Fatalf("Close: %v", err) + } + if _, err := s.Get(context.Background(), "prod", "k"); !errors.Is(err, config.ErrStoreClosed) { + t.Errorf("expected ErrStoreClosed, got %v", err) + } +} diff --git a/k8s/watch.go b/k8s/watch.go index a6cdef2..8e661db 100644 --- a/k8s/watch.go +++ b/k8s/watch.go @@ -7,6 +7,7 @@ import ( "github.com/rbaliyan/config" ) +// watchEntry represents a single subscriber to Store.Watch. type watchEntry struct { filter config.WatchFilter ch chan config.ChangeEvent @@ -37,7 +38,7 @@ func (s *Store) notifyWatchers(event config.ChangeEvent) { } } -// sendToWatcher safely sends an event to a watcher, handling closed channels. +// sendToWatcher safely sends an event to a watcher, dropping on a full channel. func (s *Store) sendToWatcher(we *watchEntry, event config.ChangeEvent) { we.mu.Lock() defer we.mu.Unlock()