diff --git a/.chloggen/fix_remove-optional-resources.yaml b/.chloggen/fix_remove-optional-resources.yaml new file mode 100755 index 0000000000..f5f73cf935 --- /dev/null +++ b/.chloggen/fix_remove-optional-resources.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: collector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix deletion of optional resources for OpenTelemetryCollector CRs + +# One or more tracking issues related to the change +issues: [3454] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/controllers/common.go b/controllers/common.go index 25bdc0c432..1dbea9da0b 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -21,8 +21,8 @@ import ( "github.com/go-logr/logr" rbacv1 "k8s.io/api/rbac/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -119,18 +119,32 @@ func BuildTargetAllocator(params targetallocator.Params) ([]client.Object, error // getList queries the Kubernetes API to list the requested resource, setting the list l of type T. func getList[T client.Object](ctx context.Context, cl client.Client, l T, options ...client.ListOption) (map[types.UID]client.Object, error) { ownedObjects := map[types.UID]client.Object{} - list := &unstructured.UnstructuredList{} gvk, err := apiutil.GVKForObject(l, cl.Scheme()) if err != nil { return nil, err } - list.SetGroupVersionKind(gvk) - err = cl.List(ctx, list, options...) + gvk.Kind = fmt.Sprintf("%sList", gvk.Kind) + list, err := cl.Scheme().New(gvk) + if err != nil { + return nil, fmt.Errorf("unable to list objects of type %s: %w", gvk.Kind, err) + } + + objList := list.(client.ObjectList) + + err = cl.List(ctx, objList, options...) if err != nil { return ownedObjects, fmt.Errorf("error listing %T: %w", l, err) } - for i := range list.Items { - ownedObjects[list.Items[i].GetUID()] = &list.Items[i] + objs, err := apimeta.ExtractList(objList) + if err != nil { + return ownedObjects, fmt.Errorf("error listing %T: %w", l, err) + } + for i := range objs { + typedObj, ok := objs[i].(T) + if !ok { + return ownedObjects, fmt.Errorf("error listing %T: %w", l, err) + } + ownedObjects[typedObj.GetUID()] = typedObj } return ownedObjects, nil } diff --git a/controllers/opentelemetrycollector_controller.go b/controllers/opentelemetrycollector_controller.go index 1f0211f932..9f6d063f8b 100644 --- a/controllers/opentelemetrycollector_controller.go +++ b/controllers/opentelemetrycollector_controller.go @@ -17,7 +17,6 @@ package controllers import ( "context" - "fmt" "sort" "github.com/go-logr/logr" @@ -30,12 +29,14 @@ import ( policyV1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" @@ -53,6 +54,8 @@ import ( "github.com/open-telemetry/opentelemetry-operator/pkg/featuregate" ) +const resourceOwnerKey = ".metadata.owner" + var ( ownedClusterObjectTypes = []client.Object{ &rbacv1.ClusterRole{}, @@ -82,51 +85,42 @@ type Params struct { func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Context, params manifests.Params) (map[types.UID]client.Object, error) { ownedObjects := map[types.UID]client.Object{} - ownedObjectTypes := []client.Object{ - &autoscalingv2.HorizontalPodAutoscaler{}, - &networkingv1.Ingress{}, - &policyV1.PodDisruptionBudget{}, - } - listOps := &client.ListOptions{ - Namespace: params.OtelCol.Namespace, - LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)), - } - if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available { - ownedObjectTypes = append(ownedObjectTypes, - &monitoringv1.ServiceMonitor{}, - &monitoringv1.PodMonitor{}, - ) - } - if params.Config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable { - ownedObjectTypes = append(ownedObjectTypes, &routev1.Route{}) + collectorConfigMaps := []*corev1.ConfigMap{} + ownedObjectTypes := r.GetOwnedResourceTypes() + listOpts := []client.ListOption{ + client.InNamespace(params.OtelCol.Namespace), + client.MatchingFields{resourceOwnerKey: params.OtelCol.Name}, } for _, objectType := range ownedObjectTypes { - objs, err := getList(ctx, r, objectType, listOps) + objs, err := getList(ctx, r, objectType, listOpts...) if err != nil { return nil, err } for uid, object := range objs { ownedObjects[uid] = object } - } - if params.Config.CreateRBACPermissions() == rbac.Available { - objs, err := r.findClusterRoleObjects(ctx, params) - if err != nil { - return nil, err - } - for uid, object := range objs { - ownedObjects[uid] = object + // save Collector ConfigMaps into a separate slice, we need to do additional filtering on them + switch objectType.(type) { + case *corev1.ConfigMap: + for _, object := range objs { + if !featuregate.CollectorUsesTargetAllocatorCR.IsEnabled() && object.GetLabels()["app.kubernetes.io/component"] != "opentelemetry-collector" { + // we only apply this to collector ConfigMaps + continue + } + configMap := object.(*corev1.ConfigMap) + collectorConfigMaps = append(collectorConfigMaps, configMap) + } + default: } } - configMapList := &corev1.ConfigMapList{} - err := r.List(ctx, configMapList, listOps) - if err != nil { - return nil, fmt.Errorf("error listing ConfigMaps: %w", err) - } - ownedConfigMaps := r.getConfigMapsToRemove(params.OtelCol.Spec.ConfigVersions, configMapList) - for i := range ownedConfigMaps { - ownedObjects[ownedConfigMaps[i].GetUID()] = &ownedConfigMaps[i] + // at this point we don't know if the most recent ConfigMap will still be the most recent after reconciliation, or + // if a new one will be created. We keep one additional ConfigMap to account for this. The next reconciliation that + // doesn't spawn a new ConfigMap will delete the extra one we kept here. + configVersionsToKeep := max(params.OtelCol.Spec.ConfigVersions, 1) + 1 + configMapsToKeep := getCollectorConfigMapsToKeep(configVersionsToKeep, collectorConfigMaps) + for _, configMap := range configMapsToKeep { + delete(ownedObjects, configMap.GetUID()) } return ownedObjects, nil @@ -138,7 +132,8 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co // Remove cluster roles and bindings. // Users might switch off the RBAC creation feature on the operator which should remove existing RBAC. listOpsCluster := &client.ListOptions{ - LabelSelector: labels.SelectorFromSet(manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)), + LabelSelector: labels.SelectorFromSet( + manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, collector.ComponentOpenTelemetryCollector)), } for _, objectType := range ownedClusterObjectTypes { objs, err := getList(ctx, r, objectType, listOpsCluster) @@ -152,25 +147,21 @@ func (r *OpenTelemetryCollectorReconciler) findClusterRoleObjects(ctx context.Co return ownedObjects, nil } -// getConfigMapsToRemove returns a list of ConfigMaps to remove based on the number of ConfigMaps to keep. -// It keeps the newest ConfigMap, the `configVersionsToKeep` next newest ConfigMaps, and returns the remainder. -func (r *OpenTelemetryCollectorReconciler) getConfigMapsToRemove(configVersionsToKeep int, configMapList *corev1.ConfigMapList) []corev1.ConfigMap { +// getCollectorConfigMapsToKeep gets ConfigMaps the controller would normally delete, but which we want to keep around +// anyway. This is part of a feature to keep around previous ConfigMap versions to make rollbacks easier. +// Fundamentally, this just sorts by time created and picks configVersionsToKeep latest ones. +func getCollectorConfigMapsToKeep(configVersionsToKeep int, configMaps []*corev1.ConfigMap) []*corev1.ConfigMap { configVersionsToKeep = max(1, configVersionsToKeep) - ownedConfigMaps := []corev1.ConfigMap{} - sort.Slice(configMapList.Items, func(i, j int) bool { - iTime := configMapList.Items[i].GetCreationTimestamp().Time - jTime := configMapList.Items[j].GetCreationTimestamp().Time + sort.Slice(configMaps, func(i, j int) bool { + iTime := configMaps[i].GetCreationTimestamp().Time + jTime := configMaps[j].GetCreationTimestamp().Time // sort the ConfigMaps newest to oldest return iTime.After(jTime) }) - for i := range configMapList.Items { - if i > configVersionsToKeep { - ownedConfigMaps = append(ownedConfigMaps, configMapList.Items[i]) - } - } - - return ownedConfigMaps + configMapsToKeep := min(configVersionsToKeep, len(configMaps)) + // return the first configVersionsToKeep items + return configMaps[:configMapsToKeep] } func (r *OpenTelemetryCollectorReconciler) GetParams(ctx context.Context, instance v1beta1.OpenTelemetryCollector) (manifests.Params, error) { @@ -310,32 +301,74 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct // SetupWithManager tells the manager what our controller is interested in. func (r *OpenTelemetryCollectorReconciler) SetupWithManager(mgr ctrl.Manager) error { + err := r.SetupCaches(mgr) + if err != nil { + return err + } + + ownedResources := r.GetOwnedResourceTypes() builder := ctrl.NewControllerManagedBy(mgr). - For(&v1beta1.OpenTelemetryCollector{}). - Owns(&corev1.ConfigMap{}). - Owns(&corev1.ServiceAccount{}). - Owns(&corev1.Service{}). - Owns(&appsv1.Deployment{}). - Owns(&appsv1.DaemonSet{}). - Owns(&appsv1.StatefulSet{}). - Owns(&networkingv1.Ingress{}). - Owns(&autoscalingv2.HorizontalPodAutoscaler{}). - Owns(&policyV1.PodDisruptionBudget{}) + For(&v1beta1.OpenTelemetryCollector{}) + + for _, resource := range ownedResources { + builder.Owns(resource) + } + + return builder.Complete(r) +} + +// SetupCaches sets up caching and indexing for our controller. +func (r *OpenTelemetryCollectorReconciler) SetupCaches(cluster cluster.Cluster) error { + ownedResources := r.GetOwnedResourceTypes() + for _, resource := range ownedResources { + if err := cluster.GetCache().IndexField(context.Background(), resource, resourceOwnerKey, func(rawObj client.Object) []string { + owner := metav1.GetControllerOf(rawObj) + if owner == nil { + return nil + } + // make sure it's an OpenTelemetryCollector + if owner.Kind != "OpenTelemetryCollector" { + return nil + } + + return []string{owner.Name} + }); err != nil { + return err + } + } + return nil +} + +// GetOwnedResourceTypes returns all the resource types the controller can own. Even though this method returns an array +// of client.Object, these are (empty) example structs rather than actual resources. +func (r *OpenTelemetryCollectorReconciler) GetOwnedResourceTypes() []client.Object { + ownedResources := []client.Object{ + &corev1.ConfigMap{}, + &corev1.ServiceAccount{}, + &corev1.Service{}, + &appsv1.Deployment{}, + &appsv1.DaemonSet{}, + &appsv1.StatefulSet{}, + &networkingv1.Ingress{}, + &autoscalingv2.HorizontalPodAutoscaler{}, + &policyV1.PodDisruptionBudget{}, + } if r.config.CreateRBACPermissions() == rbac.Available { - builder.Owns(&rbacv1.ClusterRoleBinding{}) - builder.Owns(&rbacv1.ClusterRole{}) + ownedResources = append(ownedResources, &rbacv1.ClusterRole{}) + ownedResources = append(ownedResources, &rbacv1.ClusterRoleBinding{}) } if featuregate.PrometheusOperatorIsAvailable.IsEnabled() && r.config.PrometheusCRAvailability() == prometheus.Available { - builder.Owns(&monitoringv1.ServiceMonitor{}) - builder.Owns(&monitoringv1.PodMonitor{}) + ownedResources = append(ownedResources, &monitoringv1.PodMonitor{}) + ownedResources = append(ownedResources, &monitoringv1.ServiceMonitor{}) } + if r.config.OpenShiftRoutesAvailability() == openshift.RoutesAvailable { - builder.Owns(&routev1.Route{}) + ownedResources = append(ownedResources, &routev1.Route{}) } - return builder.Complete(r) + return ownedResources } const collectorFinalizer = "opentelemetrycollector.opentelemetry.io/finalizer" diff --git a/controllers/opentelemetrycollector_reconciler_test.go b/controllers/opentelemetrycollector_reconciler_test.go new file mode 100644 index 0000000000..d881003309 --- /dev/null +++ b/controllers/opentelemetrycollector_reconciler_test.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetCollectorConfigMapsToKeep(t *testing.T) { + now := time.Now() + testCases := []struct { + name string + versionsToKeep int + input []*corev1.ConfigMap + output []*corev1.ConfigMap + }{ + { + name: "no configmaps", + input: []*corev1.ConfigMap{}, + output: []*corev1.ConfigMap{}, + }, + { + name: "one configmap", + input: []*corev1.ConfigMap{ + {}, + }, + output: []*corev1.ConfigMap{ + {}, + }, + }, + { + name: "two configmaps, keep one", + input: []*corev1.ConfigMap{ + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}}, + }, + output: []*corev1.ConfigMap{ + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}}, + }, + }, + { + name: "three configmaps, keep two", + versionsToKeep: 2, + input: []*corev1.ConfigMap{ + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Minute)}}}, + }, + output: []*corev1.ConfigMap{ + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Minute)}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: now.Add(time.Second)}}}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := getCollectorConfigMapsToKeep(tc.versionsToKeep, tc.input) + assert.Equal(t, tc.output, actualOutput) + }) + } +} diff --git a/controllers/reconcile_test.go b/controllers/reconcile_test.go index a0d6fc3bed..b944d094bf 100644 --- a/controllers/reconcile_test.go +++ b/controllers/reconcile_test.go @@ -16,6 +16,10 @@ package controllers_test import ( "context" + "fmt" + "regexp" + "slices" + "strings" "testing" "time" @@ -30,13 +34,16 @@ import ( policyV1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" k8sconfig "sigs.k8s.io/controller-runtime/pkg/client/config" + runtimecluster "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/manager" k8sreconcile "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -623,18 +630,15 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testContext := context.Background() nsn := types.NamespacedName{Name: tt.args.params.Name, Namespace: tt.args.params.Namespace} - reconciler := controllers.NewReconciler(controllers.Params{ - Client: k8sClient, - Log: logger, - Scheme: testScheme, - Recorder: record.NewFakeRecorder(20), - Config: config.New( - config.WithCollectorImage("default-collector"), - config.WithTargetAllocatorImage("default-ta-allocator"), - config.WithOpenShiftRoutesAvailability(openshift.RoutesAvailable), - config.WithPrometheusCRAvailability(prometheus.Available), - ), - }) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + reconciler := createTestReconciler(t, testCtx, config.New( + config.WithCollectorImage("default-collector"), + config.WithTargetAllocatorImage("default-ta-allocator"), + config.WithOpenShiftRoutesAvailability(openshift.RoutesAvailable), + config.WithPrometheusCRAvailability(prometheus.Available), + )) assert.True(t, len(tt.want) > 0, "must have at least one group of checks to run") firstCheck := tt.want[0] @@ -697,6 +701,301 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) { } } +// TestOpenTelemetryCollectorReconciler_RemoveDisabled starts off with optional resources enabled, and then disables +// them one by one to ensure they're actually deleted. +func TestOpenTelemetryCollectorReconciler_RemoveDisabled(t *testing.T) { + expectedStartingResourceCount := 11 + startingCollector := &v1beta1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "placeholder", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1beta1.OpenTelemetryCollectorSpec{ + TargetAllocator: v1beta1.TargetAllocatorEmbedded{ + Enabled: true, + PrometheusCR: v1beta1.TargetAllocatorPrometheusCR{ + Enabled: true, + }, + }, + Mode: v1beta1.ModeStatefulSet, + Observability: v1beta1.ObservabilitySpec{ + Metrics: v1beta1.MetricsConfigSpec{ + EnableMetrics: true, + }, + }, + Config: v1beta1.Config{ + Receivers: v1beta1.AnyConfig{ + Object: map[string]interface{}{ + "prometheus": map[string]interface{}{ + "config": map[string]interface{}{ + "scrape_configs": []interface{}{}, + }, + }, + }, + }, + Exporters: v1beta1.AnyConfig{ + Object: map[string]interface{}{ + "nop": map[string]interface{}{}, + }, + }, + Service: v1beta1.Service{ + Pipelines: map[string]*v1beta1.Pipeline{ + "logs": { + Exporters: []string{"nop"}, + Receivers: []string{"nop"}, + }, + }, + }, + }, + }, + } + + testCases := []struct { + name string + mutateCollector func(*v1beta1.OpenTelemetryCollector) + expectedResourcesDeletedCount int + }{ + { + name: "disable targetallocator", + mutateCollector: func(obj *v1beta1.OpenTelemetryCollector) { + obj.Spec.TargetAllocator.Enabled = false + }, + expectedResourcesDeletedCount: 5, + }, + { + name: "disable metrics", + mutateCollector: func(obj *v1beta1.OpenTelemetryCollector) { + obj.Spec.Observability.Metrics.EnableMetrics = false + }, + expectedResourcesDeletedCount: 1, + }, + { + name: "disable default service account", + mutateCollector: func(obj *v1beta1.OpenTelemetryCollector) { + obj.Spec.OpenTelemetryCommonFields.ServiceAccount = "placeholder" + }, + expectedResourcesDeletedCount: 1, + }, + } + + testCtx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + reconciler := createTestReconciler(t, testCtx, config.New( + config.WithCollectorImage("default-collector"), + config.WithTargetAllocatorImage("default-ta-allocator"), + config.WithOpenShiftRoutesAvailability(openshift.RoutesAvailable), + config.WithPrometheusCRAvailability(prometheus.Available), + )) + + // the base query for the underlying objects + opts := []client.ListOption{ + client.InNamespace(startingCollector.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + collectorName := sanitizeResourceName(tc.name) + collector := startingCollector.DeepCopy() + collector.Name = collectorName + nsn := types.NamespacedName{Name: collector.Name, Namespace: collector.Namespace} + clientCtx := context.Background() + err := k8sClient.Create(clientCtx, collector) + require.NoError(t, err) + t.Cleanup(func() { + deleteErr := k8sClient.Delete(clientCtx, collector) + require.NoError(t, deleteErr) + }) + err = k8sClient.Get(clientCtx, nsn, collector) + require.NoError(t, err) + req := k8sreconcile.Request{ + NamespacedName: nsn, + } + _, reconcileErr := reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + list, listErr := getAllOwnedResources(clientCtx, reconciler, collector, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, list) + assert.Len(collect, list, expectedStartingResourceCount) + }, time.Second*5, time.Millisecond) + + err = k8sClient.Get(clientCtx, nsn, collector) + require.NoError(t, err) + tc.mutateCollector(collector) + err = k8sClient.Update(clientCtx, collector) + require.NoError(t, err) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + actual := &v1beta1.OpenTelemetryCollector{} + err = reconciler.Get(clientCtx, nsn, actual) + assert.NoError(collect, err) + assert.Equal(collect, collector.Spec, actual.Spec) + }, time.Second*5, time.Millisecond) + + _, reconcileErr = reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + expectedResourceCount := expectedStartingResourceCount - tc.expectedResourcesDeletedCount + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + list, listErr := getAllOwnedResources(clientCtx, reconciler, collector, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, list) + assert.Len(collect, list, expectedResourceCount) + }, time.Second*5, time.Millisecond) + }) + } +} + +func TestOpenTelemetryCollectorReconciler_VersionedConfigMaps(t *testing.T) { + collectorName := sanitizeResourceName(t.Name()) + collector := &v1beta1.OpenTelemetryCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: collectorName, + Namespace: metav1.NamespaceDefault, + }, + Spec: v1beta1.OpenTelemetryCollectorSpec{ + ConfigVersions: 1, + TargetAllocator: v1beta1.TargetAllocatorEmbedded{ + Enabled: true, + PrometheusCR: v1beta1.TargetAllocatorPrometheusCR{ + Enabled: true, + }, + }, + Mode: v1beta1.ModeStatefulSet, + Config: v1beta1.Config{ + Receivers: v1beta1.AnyConfig{ + Object: map[string]interface{}{ + "prometheus": map[string]interface{}{ + "config": map[string]interface{}{ + "scrape_configs": []interface{}{}, + }, + }, + "nop": map[string]interface{}{}, + }, + }, + Exporters: v1beta1.AnyConfig{ + Object: map[string]interface{}{ + "nop": map[string]interface{}{}, + }, + }, + Service: v1beta1.Service{ + Pipelines: map[string]*v1beta1.Pipeline{ + "logs": { + Exporters: []string{"nop"}, + Receivers: []string{"nop"}, + }, + }, + }, + }, + }, + } + + testCtx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + reconciler := createTestReconciler(t, testCtx, config.New( + config.WithCollectorImage("default-collector"), + config.WithTargetAllocatorImage("default-ta-allocator"), + config.WithOpenShiftRoutesAvailability(openshift.RoutesAvailable), + config.WithPrometheusCRAvailability(prometheus.Available), + )) + + nsn := types.NamespacedName{Name: collector.Name, Namespace: collector.Namespace} + // the base query for the underlying objects + opts := []client.ListOption{ + client.InNamespace(collector.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/managed-by": "opentelemetry-operator", + "app.kubernetes.io/instance": naming.Truncate("%s.%s", 63, nsn.Namespace, nsn.Name), + }), + } + + clientCtx := context.Background() + err := k8sClient.Create(clientCtx, collector) + require.NoError(t, err) + t.Cleanup(func() { + deleteErr := k8sClient.Delete(clientCtx, collector) + require.NoError(t, deleteErr) + }) + err = k8sClient.Get(clientCtx, nsn, collector) + require.NoError(t, err) + req := k8sreconcile.Request{ + NamespacedName: nsn, + } + _, reconcileErr := reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + configMaps := &v1.ConfigMapList{} + listErr := k8sClient.List(clientCtx, configMaps, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, configMaps) + assert.Len(collect, configMaps.Items, 2) + }, time.Second*5, time.Millisecond) + + // modify the ConfigMap, it should be kept + err = k8sClient.Get(clientCtx, nsn, collector) + require.NoError(t, err) + collector.Spec.Config.Exporters.Object["debug"] = map[string]interface{}{} + err = k8sClient.Update(clientCtx, collector) + require.NoError(t, err) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + actual := &v1beta1.OpenTelemetryCollector{} + err = reconciler.Get(clientCtx, nsn, actual) + assert.NoError(collect, err) + assert.Equal(collect, collector.Spec, actual.Spec) + }, time.Second*5, time.Millisecond) + + _, reconcileErr = reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + configMaps := &v1.ConfigMapList{} + listErr := k8sClient.List(clientCtx, configMaps, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, configMaps) + assert.Len(collect, configMaps.Items, 3) + }, time.Second*5, time.Millisecond) + + // modify the ConfigMap again, the oldest one is still kept, but is dropped after next reconciliation + err = k8sClient.Get(clientCtx, nsn, collector) + require.NoError(t, err) + collector.Spec.Config.Exporters.Object["debug/2"] = map[string]interface{}{} + err = k8sClient.Update(clientCtx, collector) + require.NoError(t, err) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + actual := &v1beta1.OpenTelemetryCollector{} + err = reconciler.Get(clientCtx, nsn, actual) + assert.NoError(collect, err) + assert.Equal(collect, collector.Spec, actual.Spec) + }, time.Second*5, time.Millisecond) + + _, reconcileErr = reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + configMaps := &v1.ConfigMapList{} + listErr := k8sClient.List(clientCtx, configMaps, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, configMaps) + assert.Len(collect, configMaps.Items, 4) + }, time.Second*5, time.Millisecond) + + _, reconcileErr = reconciler.Reconcile(clientCtx, req) + assert.NoError(t, reconcileErr) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + configMaps := &v1.ConfigMapList{} + listErr := k8sClient.List(clientCtx, configMaps, opts...) + assert.NoError(collect, listErr) + assert.NotEmpty(collect, configMaps) + assert.Len(collect, configMaps.Items, 3) + }, time.Second*5, time.Millisecond) +} + func TestOpAMPBridgeReconciler_Reconcile(t *testing.T) { addedMetadataDeployment := opampBridgeParams() addedMetadataDeployment.OpAMPBridge.Labels = map[string]string{ @@ -928,17 +1227,14 @@ service: clientErr = k8sClient.Create(context.Background(), otelcol) require.NoError(t, clientErr) - reconciler := controllers.NewReconciler(controllers.Params{ - Client: k8sClient, - Log: logger, - Scheme: testScheme, - Recorder: record.NewFakeRecorder(20), - Config: config.New( - config.WithCollectorImage("default-collector"), - config.WithTargetAllocatorImage("default-ta-allocator"), - config.WithRBACPermissions(autoRBAC.Available), - ), - }) + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + reconciler := createTestReconciler(t, testCtx, config.New( + config.WithCollectorImage("default-collector"), + config.WithTargetAllocatorImage("default-ta-allocator"), + config.WithRBACPermissions(autoRBAC.Available), + )) nsn := types.NamespacedName{Name: otelcol.Name, Namespace: otelcol.Namespace} req := k8sreconcile.Request{ @@ -983,3 +1279,83 @@ func namespacedObjectName(name string, namespace string) types.NamespacedName { Name: name, } } + +// getAllResources gets all the resource types owned by the controller. +func getAllOwnedResources( + ctx context.Context, + reconciler *controllers.OpenTelemetryCollectorReconciler, + owner *v1beta1.OpenTelemetryCollector, + options ...client.ListOption, +) ([]client.Object, error) { + ownedResourceTypes := reconciler.GetOwnedResourceTypes() + allResources := []client.Object{} + for _, resourceType := range ownedResourceTypes { + list := &unstructured.UnstructuredList{} + gvk, err := apiutil.GVKForObject(resourceType, k8sClient.Scheme()) + if err != nil { + return nil, err + } + list.SetGroupVersionKind(gvk) + err = k8sClient.List(ctx, list, options...) + if err != nil { + return []client.Object{}, fmt.Errorf("error listing %s: %w", gvk.Kind, err) + } + for _, obj := range list.Items { + if obj.GetDeletionTimestamp() != nil { + continue + } + + newObj := obj + if !IsOwnedBy(&newObj, owner) { + continue + } + allResources = append(allResources, &newObj) + } + } + return allResources, nil +} + +func IsOwnedBy(obj metav1.Object, owner *v1beta1.OpenTelemetryCollector) bool { + if obj.GetNamespace() != owner.GetNamespace() { + labels := obj.GetLabels() + instanceLabelValue := labels["app.kubernetes.io/instance"] + return instanceLabelValue == naming.Truncate("%s.%s", 63, owner.Namespace, owner.Name) + } + ownerReferences := obj.GetOwnerReferences() + isOwner := slices.ContainsFunc(ownerReferences, func(ref metav1.OwnerReference) bool { + return ref.UID == owner.GetUID() + }) + return isOwner +} + +func createTestReconciler(t *testing.T, ctx context.Context, cfg config.Config) *controllers.OpenTelemetryCollectorReconciler { + t.Helper() + // we need to set up caches for our reconciler + runtimeCluster, err := runtimecluster.New(restCfg, func(options *runtimecluster.Options) { + options.Scheme = testScheme + }) + require.NoError(t, err) + go func() { + startErr := runtimeCluster.Start(ctx) + require.NoError(t, startErr) + }() + + cacheClient := runtimeCluster.GetClient() + reconciler := controllers.NewReconciler(controllers.Params{ + Client: cacheClient, + Log: logger, + Scheme: testScheme, + Recorder: record.NewFakeRecorder(20), + Config: cfg, + }) + err = reconciler.SetupCaches(runtimeCluster) + require.NoError(t, err) + return reconciler +} + +func sanitizeResourceName(name string) string { + sanitized := strings.ToLower(name) + re := regexp.MustCompile("[^a-z0-9-]") + sanitized = re.ReplaceAllString(sanitized, "-") + return sanitized +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 1dc118d9dd..c32b101280 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -72,8 +72,7 @@ var ( testScheme *runtime.Scheme = scheme.Scheme ctx context.Context cancel context.CancelFunc - err error - cfg *rest.Config + restCfg *rest.Config logger = logf.Log.WithName("unit-tests") instanceUID = uuid.NewUUID() @@ -136,14 +135,11 @@ func (m *mockAutoDetect) CertManagerAvailability(ctx context.Context) (certmanag } func TestMain(m *testing.M) { + var err error ctx, cancel = context.WithCancel(context.TODO()) defer cancel() - if err != nil { - fmt.Printf("failed to start testEnv: %v", err) - os.Exit(1) - } - + // +kubebuilder:scaffold:scheme utilruntime.Must(monitoringv1.AddToScheme(testScheme)) utilruntime.Must(networkingv1.AddToScheme(testScheme)) utilruntime.Must(routev1.AddToScheme(testScheme)) @@ -157,10 +153,13 @@ func TestMain(m *testing.M) { Paths: []string{filepath.Join("..", "config", "webhook")}, }, } - cfg, err = testEnv.Start() - // +kubebuilder:scaffold:scheme + restCfg, err = testEnv.Start() + if err != nil { + fmt.Printf("failed to start testEnv: %v", err) + os.Exit(1) + } - k8sClient, err = client.New(cfg, client.Options{Scheme: testScheme}) + k8sClient, err = client.New(restCfg, client.Options{Scheme: testScheme}) if err != nil { fmt.Printf("failed to setup a Kubernetes client: %v", err) os.Exit(1) @@ -168,7 +167,7 @@ func TestMain(m *testing.M) { // start webhook server using Manager webhookInstallOptions := &testEnv.WebhookInstallOptions - mgr, mgrErr := ctrl.NewManager(cfg, ctrl.Options{ + mgr, mgrErr := ctrl.NewManager(restCfg, ctrl.Options{ Scheme: testScheme, LeaderElection: false, WebhookServer: webhook.NewServer(webhook.Options{ @@ -184,8 +183,8 @@ func TestMain(m *testing.M) { fmt.Printf("failed to start webhook server: %v", mgrErr) os.Exit(1) } - clientset, clientErr := kubernetes.NewForConfig(cfg) - if err != nil { + clientset, clientErr := kubernetes.NewForConfig(restCfg) + if clientErr != nil { fmt.Printf("failed to setup kubernetes clientset %v", clientErr) } reviewer := rbac.NewReviewer(clientset) @@ -506,10 +505,10 @@ func populateObjectIfExists(t testing.TB, object client.Object, namespacedName t } func getConfigMapSHAFromString(configStr string) (string, error) { - var config v1beta1.Config - err := yaml.Unmarshal([]byte(configStr), &config) + var cfg v1beta1.Config + err := yaml.Unmarshal([]byte(configStr), &cfg) if err != nil { return "", err } - return manifestutils.GetConfigMapSHA(config) + return manifestutils.GetConfigMapSHA(cfg) } diff --git a/tests/e2e/smoke-deletion/00-assert.yaml b/tests/e2e/smoke-deletion/00-assert.yaml new file mode 100644 index 0000000000..cbd2286258 --- /dev/null +++ b/tests/e2e/smoke-deletion/00-assert.yaml @@ -0,0 +1,65 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: stateful-collector +--- +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app.kubernetes.io/name: stateful-collector +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: stateful-collector +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: stateful-collector +--- +apiVersion: v1 +kind: Service +metadata: + name: stateful-collector +--- +apiVersion: v1 +kind: Service +metadata: + name: stateful-collector-headless +--- +apiVersion: v1 +kind: Service +metadata: + name: stateful-collector-monitoring +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: stateful-monitoring-collector +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: stateful-targetallocator +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: stateful-targetallocator +--- +apiVersion: v1 +kind: Service +metadata: + name: stateful-targetallocator +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: stateful-targetallocator +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: stateful-targetallocator \ No newline at end of file diff --git a/tests/e2e/smoke-deletion/00-install.yaml b/tests/e2e/smoke-deletion/00-install.yaml new file mode 100644 index 0000000000..b1d1bb36a5 --- /dev/null +++ b/tests/e2e/smoke-deletion/00-install.yaml @@ -0,0 +1,73 @@ +apiVersion: v1 +automountServiceAccountToken: true +kind: ServiceAccount +metadata: + name: ta +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: smoke-targetallocator +rules: +- apiGroups: + - "" + resources: + - pods + - namespaces + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: (join('-', ['default-view', $namespace])) +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: smoke-targetallocator +subjects: +- kind: ServiceAccount + name: ta + namespace: ($namespace) +--- +apiVersion: opentelemetry.io/v1beta1 +kind: OpenTelemetryCollector +metadata: + name: stateful +spec: + autoscaler: + minReplicas: 1 + maxReplicas: 1 + targetCPUUtilization: 50 + config: + receivers: + # Collect own metrics + prometheus: + config: + scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 10s + static_configs: + - targets: [ '0.0.0.0:8888' ] + exporters: + debug: + service: + pipelines: + metrics: + receivers: [prometheus] + exporters: [debug] + mode: statefulset + ports: + - port: 9999 + name: test + targetAllocator: + enabled: true + serviceAccount: ta + observability: + metrics: + enableMetrics: true + observability: + metrics: + enableMetrics: true diff --git a/tests/e2e/smoke-deletion/01-assert.yaml b/tests/e2e/smoke-deletion/01-assert.yaml new file mode 100644 index 0000000000..7ea22c086a --- /dev/null +++ b/tests/e2e/smoke-deletion/01-assert.yaml @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: stateful-collector +--- +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app.kubernetes.io/name: stateful-collector +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: stateful-collector +--- +(x_k8s_exists($client, 'autoscaling/v2', 'HorizontalPodAutoscaler', $namespace, 'stateful-collector')): false +--- +(x_k8s_exists($client, 'v1', 'Service', $namespace, 'stateful-collector')): false +--- +(x_k8s_exists($client, 'v1', 'Service', $namespace, 'stateful-collector-headless')): false +--- +apiVersion: v1 +kind: Service +metadata: + name: stateful-collector-monitoring +--- +(x_k8s_exists($client, 'monitoring.coreos.com/v1', 'ServiceMonitor', $namespace, 'stateful-monitoring-collector')): false +--- +(x_k8s_exists($client, 'apps/v1', 'Deployment', $namespace, 'stateful-targetallocator')): false +--- +(x_k8s_exists($client, 'v1', 'ConfigMap', $namespace, 'stateful-targetallocator')): false +--- +(x_k8s_exists($client, 'v1', 'Service', $namespace, 'stateful-targetallocator')): false +--- +(x_k8s_exists($client, 'policy/v1', 'PodDisruptionBudget', $namespace, 'stateful-targetallocator')): false +--- +(x_k8s_exists($client, 'monitoring.coreos.com/v1', 'ServiceMonitor', $namespace, 'stateful-targetallocator')): false diff --git a/tests/e2e/smoke-deletion/01-install.yaml b/tests/e2e/smoke-deletion/01-install.yaml new file mode 100644 index 0000000000..ccb9caf23a --- /dev/null +++ b/tests/e2e/smoke-deletion/01-install.yaml @@ -0,0 +1,22 @@ +apiVersion: opentelemetry.io/v1beta1 +kind: OpenTelemetryCollector +metadata: + name: stateful +spec: + autoscaler: null + config: + receivers: + nop: + exporters: + nop: + service: + pipelines: + metrics: + receivers: [nop] + exporters: [nop] + ports: [] + targetAllocator: + enabled: false + observability: + metrics: + enableMetrics: false \ No newline at end of file diff --git a/tests/e2e/smoke-deletion/chainsaw-test.yaml b/tests/e2e/smoke-deletion/chainsaw-test.yaml new file mode 100755 index 0000000000..13fef94948 --- /dev/null +++ b/tests/e2e/smoke-deletion/chainsaw-test.yaml @@ -0,0 +1,26 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/kyverno/chainsaw/main/.schemas/json/test-chainsaw-v1alpha1.json +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: smoke-deletion +spec: + steps: + - name: step-00 + try: + - apply: + template: true + file: 00-install.yaml + - assert: + file: 00-assert.yaml + catch: + - podLogs: + selector: app.kubernetes.io/component=opentelemetry-targetallocator + - name: step-01 + try: + - apply: + file: 01-install.yaml + - assert: + file: 01-assert.yaml + catch: + - podLogs: + selector: app.kubernetes.io/component=opentelemetry-targetallocator