Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/k8sclient/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (c *Client) ApplyManifest(
switch result.Operation {
case manifest.OperationCreate:
_, applyErr = c.CreateResource(ctx, newManifest)
if applyErr != nil && apierrors.IsAlreadyExists(applyErr) {
// Resource was created by a concurrent process between our Get and Create.
// Treat as a successful no-op rather than an error.
c.log.Debugf(ctx, "Resource %s/%s already exists (concurrent create), treating as skip", gvk.Kind, name)
result.Operation = manifest.OperationSkip
result.Reason = "already exists (concurrent create)"
applyErr = nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test coverage in apply_test.go. TestApplyManifest_CreateAlreadyExists verifies the concurrent create race condition: first ApplyManifest creates the resource, second ApplyManifest with existing=nil hits AlreadyExists and returns OperationSkip with reason "already exists (concurrent create)". Also added tests for create success, same-generation skip, and nil manifest validation.

case manifest.OperationUpdate:
// Preserve resourceVersion and UID from existing for update
Expand Down
100 changes: 100 additions & 0 deletions internal/k8sclient/apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package k8sclient

import (
"context"
"fmt"
"testing"

"github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest"
"github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func newTestClient() *Client {
scheme := runtime.NewScheme()
builder := fake.NewClientBuilder().WithScheme(scheme)
log, _ := logger.NewLogger(logger.Config{Level: "error", Output: "stdout", Format: "json"})
return &Client{
client: builder.Build(),
log: log,
}
}

func newConfigMap(name, namespace string, generation int64) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(CommonResourceKinds.ConfigMap)
obj.SetName(name)
obj.SetNamespace(namespace)
obj.SetAnnotations(map[string]string{
"hyperfleet.io/generation": fmt.Sprintf("%d", generation),
})
obj.Object["data"] = map[string]any{
"key": "value",
}
return obj
}

func TestApplyManifest_CreateAlreadyExists(t *testing.T) {
ctx := context.Background()
c := newTestClient()

cm := newConfigMap("test-cm", "default", 1)

// First create should succeed
result1, err := c.ApplyManifest(ctx, cm, nil, nil)
require.NoError(t, err)
assert.Equal(t, manifest.OperationCreate, result1.Operation)

// Second create with nil existing (simulates concurrent create race)
// ApplyManifest sees existing=nil so decides to create, but resource already exists
cm2 := newConfigMap("test-cm", "default", 1)
result2, err := c.ApplyManifest(ctx, cm2, nil, nil)
require.NoError(t, err)
assert.Equal(t, manifest.OperationSkip, result2.Operation)
assert.Equal(t, "already exists (concurrent create)", result2.Reason)
}

func TestApplyManifest_CreateSuccess(t *testing.T) {
ctx := context.Background()
c := newTestClient()

cm := newConfigMap("new-cm", "default", 1)
result, err := c.ApplyManifest(ctx, cm, nil, nil)
require.NoError(t, err)
assert.Equal(t, manifest.OperationCreate, result.Operation)
}

func TestApplyManifest_SkipSameGeneration(t *testing.T) {
ctx := context.Background()
c := newTestClient()

cm := newConfigMap("existing-cm", "default", 1)

// Create the resource first
_, err := c.CreateResource(ctx, cm)
require.NoError(t, err)

// Get existing to pass to ApplyManifest
existing, err := c.GetResource(ctx, CommonResourceKinds.ConfigMap, "default", "existing-cm", nil)
require.NoError(t, err)

// Apply with same generation should skip
newCm := newConfigMap("existing-cm", "default", 1)
result, err := c.ApplyManifest(ctx, newCm, existing, nil)
require.NoError(t, err)
assert.Equal(t, manifest.OperationSkip, result.Operation)
}

func TestApplyManifest_NilManifest(t *testing.T) {
ctx := context.Background()
c := newTestClient()

result, err := c.ApplyManifest(ctx, nil, nil, nil)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "new manifest cannot be nil")
}
26 changes: 0 additions & 26 deletions internal/k8sclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ func (c *Client) CreateResource(
namespace := obj.GetNamespace()
name := obj.GetName()

c.log.Infof(ctx, "Creating resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

err := c.client.Create(ctx, obj)
if err != nil {
if apierrors.IsAlreadyExists(err) {
Expand All @@ -138,8 +136,6 @@ func (c *Client) CreateResource(
Err: err,
}
}

c.log.Infof(ctx, "Successfully created resource: %s/%s", gvk.Kind, name)
return obj, nil
}

Expand All @@ -150,8 +146,6 @@ func (c *Client) GetResource(
namespace, name string,
_ transportclient.TransportContext,
) (*unstructured.Unstructured, error) {
c.log.Infof(ctx, "Getting resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)

Expand All @@ -175,8 +169,6 @@ func (c *Client) GetResource(
Err: err,
}
}

c.log.Infof(ctx, "Successfully retrieved resource: %s/%s", gvk.Kind, name)
return obj, nil
}

Expand All @@ -194,9 +186,6 @@ func (c *Client) ListResources(
namespace string,
labelSelector string,
) (*unstructured.UnstructuredList, error) {
c.log.Infof(ctx, "Listing resources: %s (namespace: %s, labelSelector: %s)",
gvk.Kind, namespace, labelSelector)

list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)

Expand Down Expand Up @@ -228,8 +217,6 @@ func (c *Client) ListResources(
}
}

c.log.Infof(ctx, "Successfully listed resources: %s (found %d items)",
gvk.Kind, len(list.Items))
return list, nil
}

Expand Down Expand Up @@ -261,8 +248,6 @@ func (c *Client) UpdateResource(
namespace := obj.GetNamespace()
name := obj.GetName()

c.log.Infof(ctx, "Updating resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

err := c.client.Update(ctx, obj)
if err != nil {
if apierrors.IsConflict(err) {
Expand All @@ -277,15 +262,11 @@ func (c *Client) UpdateResource(
Err: err,
}
}

c.log.Infof(ctx, "Successfully updated resource: %s/%s", gvk.Kind, name)
return obj, nil
}

// DeleteResource deletes a Kubernetes resource
func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string) error {
c.log.Infof(ctx, "Deleting resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetNamespace(namespace)
Expand All @@ -294,7 +275,6 @@ func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind
err := c.client.Delete(ctx, obj)
if err != nil {
if apierrors.IsNotFound(err) {
c.log.Infof(ctx, "Resource already deleted: %s/%s", gvk.Kind, name)
return nil
}
return &apperrors.K8sOperationError{
Expand All @@ -306,8 +286,6 @@ func (c *Client) DeleteResource(ctx context.Context, gvk schema.GroupVersionKind
Err: err,
}
}

c.log.Infof(ctx, "Successfully deleted resource: %s/%s", gvk.Kind, name)
return nil
}

Expand Down Expand Up @@ -342,8 +320,6 @@ func (c *Client) PatchResource(
namespace, name string,
patchData []byte,
) (*unstructured.Unstructured, error) {
c.log.Infof(ctx, "Patching resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace)

// Parse patch data to validate JSON
var patchObj map[string]interface{}
if err := json.Unmarshal(patchData, &patchObj); err != nil {
Expand Down Expand Up @@ -376,8 +352,6 @@ func (c *Client) PatchResource(
}
}

c.log.Infof(ctx, "Successfully patched resource: %s/%s", gvk.Kind, name)

// Get the updated resource to return
return c.GetResource(ctx, gvk, namespace, name, nil)
}
2 changes: 0 additions & 2 deletions internal/maestroclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ func (c *Client) ApplyResource(
// Set namespace to consumer name
work.Namespace = consumerName

c.log.Infof(ctx, "Applying ManifestWork %s/%s", consumerName, work.Name)

// Apply the ManifestWork (create or update with generation comparison)
result, err := c.ApplyManifestWork(ctx, consumerName, work)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions internal/maestroclient/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func (c *Client) CreateManifestWork(
ctx = logger.WithLogField(ctx, "manifestwork", work.Name)
ctx = logger.WithObservedGeneration(ctx, manifest.GetGeneration(work.ObjectMeta))

c.log.WithFields(map[string]interface{}{
"manifests": len(work.Spec.Workload.Manifests),
}).Debug(ctx, "Creating ManifestWork")

// Set namespace to consumer name (required by Maestro)
work.Namespace = consumerName

Expand All @@ -68,7 +64,6 @@ func (c *Client) CreateManifestWork(
consumerName, work.Name, err)
}

c.log.Info(ctx, "Created ManifestWork")
return created, nil
}

Expand All @@ -81,8 +76,6 @@ func (c *Client) GetManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Getting ManifestWork")

work, err := c.workClient.ManifestWorks(consumerName).Get(ctx, workName, metav1.GetOptions{})
if err != nil {
// Return not found error without wrapping for callers to check
Expand All @@ -106,8 +99,6 @@ func (c *Client) PatchManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Patching ManifestWork")

patched, err := c.workClient.ManifestWorks(consumerName).Patch(
ctx,
workName,
Expand All @@ -120,7 +111,6 @@ func (c *Client) PatchManifestWork(
consumerName, workName, err)
}

c.log.Info(ctx, "Patched ManifestWork")
return patched, nil
}

Expand All @@ -133,20 +123,16 @@ func (c *Client) DeleteManifestWork(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Deleting ManifestWork")

err := c.workClient.ManifestWorks(consumerName).Delete(ctx, workName, metav1.DeleteOptions{})
if err != nil {
// Ignore not found errors (already deleted)
if apierrors.IsNotFound(err) {
c.log.Debug(ctx, "ManifestWork already deleted")
return nil
}
return apperrors.MaestroError("failed to delete ManifestWork %s/%s: %v",
consumerName, workName, err)
}

c.log.Info(ctx, "Deleted ManifestWork")
return nil
}

Expand All @@ -158,10 +144,6 @@ func (c *Client) ListManifestWorks(
) (*workv1.ManifestWorkList, error) {
ctx = logger.WithMaestroConsumer(ctx, consumerName)

c.log.WithFields(map[string]interface{}{
"labelSelector": labelSelector,
}).Debug(ctx, "Listing ManifestWorks")

opts := metav1.ListOptions{}
if labelSelector != "" {
opts.LabelSelector = labelSelector
Expand All @@ -173,9 +155,6 @@ func (c *Client) ListManifestWorks(
consumerName, err)
}

c.log.WithFields(map[string]interface{}{
"count": len(list.Items),
}).Debug(ctx, "Listed ManifestWorks")
return list, nil
}

Expand Down Expand Up @@ -217,8 +196,6 @@ func (c *Client) ApplyManifestWork(
ctx = logger.WithLogField(ctx, "manifestwork", manifestWork.Name)
ctx = logger.WithObservedGeneration(ctx, newGeneration)

c.log.Debug(ctx, "Applying ManifestWork")

// Check if ManifestWork exists
existing, err := c.GetManifestWork(ctx, consumerName, manifestWork.Name)
exists := err == nil
Expand Down Expand Up @@ -307,8 +284,6 @@ func (c *Client) DiscoverManifest(
ctx = logger.WithMaestroConsumer(ctx, consumerName)
ctx = logger.WithLogField(ctx, "manifestwork", workName)

c.log.Debug(ctx, "Discovering manifests in ManifestWork")

// Get the ManifestWork
work, err := c.GetManifestWork(ctx, consumerName, workName)
if err != nil {
Expand All @@ -329,10 +304,6 @@ func (c *Client) DiscoverManifest(
consumerName, workName, err)
}

c.log.WithFields(map[string]interface{}{
"found": len(list.Items),
}).Debug(ctx, "Discovered manifests in ManifestWork")

return list, nil
}

Expand Down