Skip to content

Commit

Permalink
Allow setting target allocator via label (#3411)
Browse files Browse the repository at this point in the history
* Allow setting target allocator via label

* Move label definition to constants package

* Fix context handling in collector webhook build validator
  • Loading branch information
swiatekm authored Nov 5, 2024
1 parent 05228b9 commit 05a55b6
Show file tree
Hide file tree
Showing 18 changed files with 399 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
- e2e-pdb
- e2e-prometheuscr
- e2e-targetallocator
- e2e-targetallocator-cr
- e2e-upgrade
- e2e-multi-instrumentation
- e2e-metadata-filters
Expand All @@ -51,6 +52,8 @@ jobs:
kube-version: "1.29"
- group: e2e-targetallocator
setup: "enable-targetallocator-cr prepare-e2e"
- group: e2e-targetallocator-cr
setup: "enable-targetallocator-cr prepare-e2e"
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ e2e-prometheuscr: chainsaw
e2e-targetallocator: chainsaw
$(CHAINSAW) test --test-dir ./tests/e2e-targetallocator

# Target allocator CR end-to-tests
.PHONY: e2e-targetallocator-cr
e2e-targetallocator-cr: chainsaw
$(CHAINSAW) test --test-dir ./tests/e2e-targetallocator-cr

.PHONY: add-certmanager-permissions
add-certmanager-permissions:
# Kustomize only allows patches in the folder where the kustomization is located
Expand Down
6 changes: 3 additions & 3 deletions apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object
c.metrics.create(ctx, otelcol)
}
if c.bv != nil {
newWarnings := c.bv(*otelcol)
newWarnings := c.bv(ctx, *otelcol)
warnings = append(warnings, newWarnings...)
}
return warnings, nil
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c CollectorWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj run
}

if c.bv != nil {
newWarnings := c.bv(*otelcol)
newWarnings := c.bv(ctx, *otelcol)
warnings = append(warnings, newWarnings...)
}
return warnings, nil
Expand Down Expand Up @@ -435,7 +435,7 @@ func checkAutoscalerSpec(autoscaler *AutoscalerSpec) error {

// BuildValidator enables running the manifest generators for the collector reconciler
// +kubebuilder:object:generate=false
type BuildValidator func(c OpenTelemetryCollector) admission.Warnings
type BuildValidator func(ctx context.Context, c OpenTelemetryCollector) admission.Warnings

func NewCollectorWebhook(
logger logr.Logger,
Expand Down
8 changes: 4 additions & 4 deletions apis/v1beta1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestValidate(t *testing.T) {
},
}

bv := func(collector v1beta1.OpenTelemetryCollector) admission.Warnings {
bv := func(_ context.Context, collector v1beta1.OpenTelemetryCollector) admission.Warnings {
var warnings admission.Warnings
cfg := config.New(
config.WithCollectorImage("default-collector"),
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
},
}

bv := func(collector v1beta1.OpenTelemetryCollector) admission.Warnings {
bv := func(_ context.Context, collector v1beta1.OpenTelemetryCollector) admission.Warnings {
var warnings admission.Warnings
cfg := config.New(
config.WithCollectorImage("default-collector"),
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func TestOTELColValidatingWebhook(t *testing.T) {
},
}

bv := func(collector v1beta1.OpenTelemetryCollector) admission.Warnings {
bv := func(_ context.Context, collector v1beta1.OpenTelemetryCollector) admission.Warnings {
var warnings admission.Warnings
cfg := config.New(
config.WithCollectorImage("default-collector"),
Expand Down Expand Up @@ -1433,7 +1433,7 @@ func TestOTELColValidateUpdateWebhook(t *testing.T) {
},
}

bv := func(collector v1beta1.OpenTelemetryCollector) admission.Warnings {
bv := func(_ context.Context, collector v1beta1.OpenTelemetryCollector) admission.Warnings {
var warnings admission.Warnings
cfg := config.New(
config.WithCollectorImage("default-collector"),
Expand Down
21 changes: 18 additions & 3 deletions controllers/opentelemetrycollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/autodetect/openshift"
"github.com/open-telemetry/opentelemetry-operator/internal/autodetect/prometheus"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils"
collectorStatus "github.com/open-telemetry/opentelemetry-operator/internal/status/collector"
"github.com/open-telemetry/opentelemetry-operator/pkg/constants"
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
)

Expand Down Expand Up @@ -168,7 +170,7 @@ func (r *OpenTelemetryCollectorReconciler) getConfigMapsToRemove(configVersionsT
return ownedConfigMaps
}

func (r *OpenTelemetryCollectorReconciler) GetParams(instance v1beta1.OpenTelemetryCollector) (manifests.Params, error) {
func (r *OpenTelemetryCollectorReconciler) GetParams(ctx context.Context, instance v1beta1.OpenTelemetryCollector) (manifests.Params, error) {
p := manifests.Params{
Config: r.config,
Client: r.Client,
Expand All @@ -179,14 +181,27 @@ func (r *OpenTelemetryCollectorReconciler) GetParams(instance v1beta1.OpenTeleme
}

// generate the target allocator CR from the collector CR
targetAllocator, err := collector.TargetAllocator(p)
targetAllocator, err := r.getTargetAllocator(ctx, p)
if err != nil {
return p, err
}
p.TargetAllocator = targetAllocator
return p, nil
}

func (r *OpenTelemetryCollectorReconciler) getTargetAllocator(ctx context.Context, params manifests.Params) (*v1alpha1.TargetAllocator, error) {
if taName, ok := params.OtelCol.GetLabels()[constants.LabelTargetAllocator]; ok {
targetAllocator := &v1alpha1.TargetAllocator{}
taKey := client.ObjectKey{Name: taName, Namespace: params.OtelCol.GetNamespace()}
err := r.Client.Get(ctx, taKey, targetAllocator)
if err != nil {
return nil, err
}
return targetAllocator, nil
}
return collector.TargetAllocator(params)
}

// NewReconciler creates a new reconciler for OpenTelemetryCollector objects.
func NewReconciler(p Params) *OpenTelemetryCollectorReconciler {
r := &OpenTelemetryCollectorReconciler{
Expand Down Expand Up @@ -230,7 +245,7 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct
return ctrl.Result{}, client.IgnoreNotFound(err)
}

params, err := r.GetParams(instance)
params, err := r.GetParams(ctx, instance)
if err != nil {
log.Error(err, "Failed to create manifest.Params")
return ctrl.Result{}, err
Expand Down
1 change: 0 additions & 1 deletion controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/testdata"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils"
"github.com/open-telemetry/opentelemetry-operator/internal/rbac"
// +kubebuilder:scaffold:imports
)

var (
Expand Down
53 changes: 52 additions & 1 deletion controllers/targetallocator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/internal/config"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/targetallocator"
taStatus "github.com/open-telemetry/opentelemetry-operator/internal/status/targetallocator"
"github.com/open-telemetry/opentelemetry-operator/pkg/constants"
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
)

Expand Down Expand Up @@ -98,7 +99,24 @@ func (r *TargetAllocatorReconciler) getCollector(ctx context.Context, instance v
return &collector, nil
}

return nil, nil
var collectors v1beta1.OpenTelemetryCollectorList
listOpts := []client.ListOption{
client.InNamespace(instance.GetNamespace()),
client.MatchingLabels{
constants.LabelTargetAllocator: instance.GetName(),
},
}
err := r.List(ctx, &collectors, listOpts...)
if err != nil {
return nil, err
}
if len(collectors.Items) == 0 {
return nil, nil
} else if len(collectors.Items) > 1 {
return nil, fmt.Errorf("found multiple OpenTelemetry collectors annotated with the same Target Allocator: %s/%s", instance.GetNamespace(), instance.GetName())
}

return &collectors.Items[0], nil
}

// NewTargetAllocatorReconciler creates a new reconciler for TargetAllocator objects.
Expand Down Expand Up @@ -195,6 +213,25 @@ func (r *TargetAllocatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
),
)

// watch collectors which have the target allocator label
collectorSelector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: constants.LabelTargetAllocator,
Operator: metav1.LabelSelectorOpExists,
},
},
}
selectorPredicate, err := predicate.LabelSelectorPredicate(collectorSelector)
if err != nil {
return err
}
ctrlBuilder.Watches(
&v1beta1.OpenTelemetryCollector{},
handler.EnqueueRequestsFromMapFunc(getTargetAllocatorRequestsFromLabel),
builder.WithPredicates(selectorPredicate),
)

return ctrlBuilder.Complete(r)
}

Expand All @@ -208,3 +245,17 @@ func getTargetAllocatorForCollector(_ context.Context, collector client.Object)
},
}
}

func getTargetAllocatorRequestsFromLabel(_ context.Context, collector client.Object) []reconcile.Request {
if taName, ok := collector.GetLabels()[constants.LabelTargetAllocator]; ok {
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: taName,
Namespace: collector.GetNamespace(),
},
},
}
}
return []reconcile.Request{}
}
56 changes: 55 additions & 1 deletion controllers/targetallocator_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/config"
"github.com/open-telemetry/opentelemetry-operator/pkg/constants"
)

var testLogger = logf.Log.WithName("opamp-bridge-controller-unit-tests")
Expand All @@ -55,7 +56,10 @@ func init() {
func TestTargetAllocatorReconciler_GetCollector(t *testing.T) {
testCollector := &v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "my-instance-collector",
Name: "test",
Labels: map[string]string{
constants.LabelTargetAllocator: "label-ta",
},
},
}
fakeClient := fake.NewFakeClient(testCollector)
Expand Down Expand Up @@ -105,6 +109,36 @@ func TestTargetAllocatorReconciler_GetCollector(t *testing.T) {
assert.Nil(t, collector)
assert.Errorf(t, err, "error getting owner for TargetAllocator default/test: opentelemetrycollectors.opentelemetry.io \"non_existent\" not found")
})
t.Run("collector attached by label", func(t *testing.T) {
ta := v1alpha1.TargetAllocator{
ObjectMeta: metav1.ObjectMeta{
Name: "label-ta",
},
}
collector, err := reconciler.getCollector(context.Background(), ta)
require.NoError(t, err)
assert.Equal(t, testCollector, collector)
})
t.Run("multiple collectors attached by label", func(t *testing.T) {
testCollector2 := testCollector.DeepCopy()
testCollector2.SetName("test2")
fakeClient := fake.NewFakeClient(testCollector, testCollector2)
reconciler := NewTargetAllocatorReconciler(
fakeClient,
testScheme,
record.NewFakeRecorder(10),
config.New(),
testLogger,
)
ta := v1alpha1.TargetAllocator{
ObjectMeta: metav1.ObjectMeta{
Name: "label-ta",
},
}
collector, err := reconciler.getCollector(context.Background(), ta)
assert.Nil(t, collector)
assert.Errorf(t, err, "found multiple OpenTelemetry collectors annotated with the same Target Allocator: %s/%s", ta.Namespace, ta.Name)
})
}

func TestGetTargetAllocatorForCollector(t *testing.T) {
Expand All @@ -123,3 +157,23 @@ func TestGetTargetAllocatorForCollector(t *testing.T) {
}}
assert.Equal(t, expected, requests)
}

func TestGetTargetAllocatorRequestsFromLabel(t *testing.T) {
testCollector := &v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Labels: map[string]string{
constants.LabelTargetAllocator: "label-ta",
},
},
}
requests := getTargetAllocatorRequestsFromLabel(context.Background(), testCollector)
expected := []reconcile.Request{{
NamespacedName: types.NamespacedName{
Name: "label-ta",
Namespace: "default",
},
}}
assert.Equal(t, expected, requests)
}
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func main() {

}

bv := func(collector otelv1beta1.OpenTelemetryCollector) admission.Warnings {
bv := func(ctx context.Context, collector otelv1beta1.OpenTelemetryCollector) admission.Warnings {
var warnings admission.Warnings
params, newErr := collectorReconciler.GetParams(collector)
params, newErr := collectorReconciler.GetParams(ctx, collector)
if err != nil {
warnings = append(warnings, newErr.Error())
return warnings
Expand Down
1 change: 1 addition & 0 deletions pkg/constants/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
LabelAppVersion = "app.kubernetes.io/version"
LabelAppPartOf = "app.kubernetes.io/part-of"

LabelTargetAllocator = "opentelemetry.io/target-allocator"
ResourceAttributeAnnotationPrefix = "resource.opentelemetry.io/"

EnvPodName = "OTEL_RESOURCE_ATTRIBUTES_POD_NAME"
Expand Down
40 changes: 40 additions & 0 deletions tests/e2e-targetallocator-cr/targetallocator-label/00-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app.kubernetes.io/name: ta-collector
data:
collector.yaml: |
receivers:
prometheus:
config:
scrape_configs:
- job_name: otel-collector
scrape_interval: 10s
static_configs:
- targets:
- 0.0.0.0:8888
exporters:
debug: {}
service:
telemetry:
metrics:
address: 0.0.0.0:8888
pipelines:
metrics:
exporters:
- debug
receivers:
- prometheus
---
apiVersion: v1
data:
targetallocator.yaml: |
allocation_strategy: consistent-hashing
collector_selector: null
filter_strategy: ""
kind: ConfigMap
metadata:
name: ta-targetallocator
Loading

0 comments on commit 05a55b6

Please sign in to comment.