Skip to content

Commit

Permalink
add apply op result to client.Apply (BREAKING)
Browse files Browse the repository at this point in the history
In cases we want to know the result of an apply. Hence use HTTP
status code to indicate the operation result from the store.

resolves #17
  • Loading branch information
jlarfors committed Mar 16, 2024
1 parent 8d4bb1c commit c966403
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 71 deletions.
2 changes: 1 addition & 1 deletion examples/greetings/greetings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGreeting(t *testing.T) {
Name: "Pekka",
},
}
err = greetClient.Apply(ctx, greeting)
_, err = greetClient.Apply(ctx, greeting)
if err != nil {
t.Fatal("applying greeting: ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/greetings/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (r *GreetingReconciler) Reconcile(
Error: fmt.Sprintf("running hello action: %s", err),
Response: "",
}
if err := r.GreetingClient.Apply(ctx, applyGreet); err != nil {
if _, err := r.GreetingClient.Apply(ctx, applyGreet); err != nil {
return hz.Result{}, fmt.Errorf("updating greeting: %w", err)
}
return hz.Result{}, fmt.Errorf("running hello action: %w", err)
Expand All @@ -56,7 +56,7 @@ func (r *GreetingReconciler) Reconcile(
Error: "",
Response: reply.Status.Response,
}
if err := r.GreetingClient.Apply(ctx, applyGreet); err != nil {
if _, err := r.GreetingClient.Apply(ctx, applyGreet); err != nil {
return hz.Result{}, fmt.Errorf("updating greeting: %w", err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/extensions/accounts/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *AccountReconciler) Reconcile(
accountApply.Status.Ready = true
// Save the account and trigger a requeue to publish the account in
// nats.
if err := accClient.Apply(ctx, accountApply); err != nil {
if _, err := accClient.Apply(ctx, accountApply); err != nil {
return hz.Result{}, fmt.Errorf("updating account: %w", err)
}
return hz.Result{}, nil
Expand Down Expand Up @@ -131,7 +131,7 @@ func (r *AccountReconciler) Reconcile(
if !ready {
if account.Status.Ready {
accountApply.Status.Ready = false
if err := accClient.Apply(ctx, accountApply); err != nil {
if _, err := accClient.Apply(ctx, accountApply); err != nil {
return hz.Result{}, fmt.Errorf("updating account: %w", err)
}
return hz.Result{}, nil
Expand All @@ -141,7 +141,7 @@ func (r *AccountReconciler) Reconcile(

if !account.Status.Ready {
accountApply.Status.Ready = true
if err := accClient.Apply(ctx, accountApply); err != nil {
if _, err := accClient.Apply(ctx, accountApply); err != nil {
return hz.Result{}, fmt.Errorf("updating account: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/accounts/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAccount(t *testing.T) {
Spec: &accounts.AccountSpec{},
}
accClient := hz.ObjectClient[accounts.Account]{Client: client}
err := accClient.Apply(ctx, account)
_, err := accClient.Apply(ctx, account)
tu.AssertNoError(t, err)

hztest.WatchWaitUntil(
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/secrets/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSecrets(t *testing.T) {
hz.WithClientInternal(true),
hz.WithClientDefaultManager(),
)
err = client.Apply(ctx, hz.WithApplyObject(secret))
_, err = client.Apply(ctx, hz.WithApplyObject(secret))
tu.AssertNoError(t, err)

raw, err := client.Get(ctx, hz.WithGetKey(secret))
Expand Down
4 changes: 2 additions & 2 deletions pkg/extensions/serviceaccounts/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r Reconciler) Reconcile(
"nats.creds": userCreds,
},
}
if err := secretClient.Apply(ctx, saSecret); err != nil {
if _, err := secretClient.Apply(ctx, saSecret); err != nil {
return hz.Result{}, fmt.Errorf("applying secret: %w", err)
}
}
Expand All @@ -101,7 +101,7 @@ func (r Reconciler) Reconcile(
Ready: true,
NATSCredentialsSecretName: &saSecret.Name,
}
if err := saClient.Apply(ctx, saApply); err != nil {
if _, err := saClient.Apply(ctx, saApply); err != nil {
return hz.Result{}, fmt.Errorf("applying service account: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/serviceaccounts/serviceaccount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestServiceAccounts(t *testing.T) {
hz.WithClientInternal(true),
)

if err := client.Apply(ctx, hz.WithApplyObject(svcAcc)); err != nil {
if _, err := client.Apply(ctx, hz.WithApplyObject(svcAcc)); err != nil {
t.Fatalf("apply service account: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (o *ObjectsHandler) apply(w http.ResponseWriter, r *http.Request) {
)
return
}
if err := client.Apply(r.Context(), hz.WithApplyObject(obj)); err != nil {
if _, err := client.Apply(r.Context(), hz.WithApplyObject(obj)); err != nil {
httpError(w, err)
return
}
Expand Down
67 changes: 48 additions & 19 deletions pkg/hz/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (oc ObjectClient[T]) Apply(
ctx context.Context,
object T,
opts ...ApplyOption,
) error {
) (ApplyOpResult, error) {
opts = append(opts, WithApplyObject(object))
return oc.Client.Apply(ctx, opts...)
}
Expand Down Expand Up @@ -426,14 +426,7 @@ func (c Client) Validate(
return fmt.Errorf("request: %w", err)
}

status, err := strconv.Atoi(reply.Header.Get(HeaderStatus))
if err != nil {
return fmt.Errorf("invalid status header: %w", err)
}
return &Error{
Status: status,
Message: string(reply.Data),
}
return ErrorFromNATS(reply)
}

type ApplyOption func(*applyOptions)
Expand Down Expand Up @@ -462,20 +455,30 @@ func WithApplyForce(force bool) ApplyOption {
}
}

type ApplyOpResult string

const (
ApplyOpResultCreated ApplyOpResult = "created"
ApplyOpResultUpdated ApplyOpResult = "updated"
ApplyOpResultNoop ApplyOpResult = "noop"
ApplyOpResultConflict ApplyOpResult = "conflict"
ApplyOpResultError ApplyOpResult = "error"
)

func (c Client) Apply(
ctx context.Context,
opts ...ApplyOption,
) error {
) (ApplyOpResult, error) {
if err := c.checkSession(); err != nil {
return err
return ApplyOpResultError, err
}
ao := applyOptions{}
for _, opt := range opts {
opt(&ao)
}

if c.Manager == "" {
return ErrApplyManagerRequired
return ApplyOpResultError, ErrApplyManagerRequired
}
var (
key ObjectKeyer
Expand All @@ -485,23 +488,29 @@ func (c Client) Apply(
var err error
data, err = c.marshalObjectWithTypeFields(ao.object)
if err != nil {
return fmt.Errorf("marshalling object: %w", err)
return ApplyOpResultError, fmt.Errorf("marshalling object: %w", err)
}
key = ao.object
}
if ao.data != nil {
var obj MetaOnlyObject
if err := json.Unmarshal(ao.data, &obj); err != nil {
return fmt.Errorf("unmarshalling data: %w", err)
return ApplyOpResultError, fmt.Errorf("unmarshalling data: %w", err)
}
key = obj
data = ao.data
}
if key == nil {
return fmt.Errorf("apply: %w", ErrClientObjectOrDataRequired)
return ApplyOpResultError, fmt.Errorf(
"apply: %w",
ErrClientObjectOrDataRequired,
)
}
if data == nil {
return fmt.Errorf("apply: %w", ErrClientObjectOrDataRequired)
return ApplyOpResultError, fmt.Errorf(
"apply: %w",
ErrClientObjectOrDataRequired,
)
}
msg := nats.NewMsg(
c.SubjectPrefix() + fmt.Sprintf(
Expand All @@ -522,15 +531,35 @@ func (c Client) Apply(
reply, err := c.Conn.RequestMsgWithContext(ctx, msg)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
return fmt.Errorf(
return ApplyOpResultError, fmt.Errorf(
"subject %q: %w",
msg.Subject,
ErrStoreNotResponding,
)
}
return fmt.Errorf("applying object: %w", err)
return ApplyOpResultError, fmt.Errorf("applying object: %w", err)
}
headerStatus := reply.Header.Get(HeaderStatus)
status, err := strconv.Atoi(headerStatus)
if err != nil {
return ApplyOpResultError, fmt.Errorf(
"invalid status header %q: %w",
headerStatus,
err,
)
}
switch status {
case http.StatusCreated:
return ApplyOpResultCreated, nil
case http.StatusOK:
return ApplyOpResultUpdated, nil
case http.StatusNotModified:
return ApplyOpResultNoop, nil
case http.StatusConflict:
return ApplyOpResultConflict, ErrorFromNATS(reply)
default:
return ApplyOpResultError, ErrorFromNATS(reply)
}
return ErrorFromNATS(reply)
}

type CreateOption func(*createOptions)
Expand Down
12 changes: 6 additions & 6 deletions pkg/hz/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestReconcilerPanic(t *testing.T) {
// If we publish messages too quickly the reconciler will only get the last.
// Add a little sleep to make sure both messages get handled.
time.Sleep(time.Second)
err = dummyClient.Apply(ctx, do)
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)

done := make(chan struct{})
Expand Down Expand Up @@ -212,12 +212,12 @@ func TestReconcilerSlow(t *testing.T) {
},
}

err = dummyClient.Apply(ctx, do)
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)
// If we publish messages too quickly the reconciler will only get the last
// message, so add a minor delay.
time.Sleep(time.Millisecond * 100)
err = dummyClient.Apply(ctx, do)
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)

done := make(chan struct{})
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestReconcilerWaitForFinish(t *testing.T) {
},
}

err = dummyClient.Apply(ctx, do)
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)
// Wait just a moment, before stopping the controller.
time.Sleep(time.Millisecond * 100)
Expand Down Expand Up @@ -397,9 +397,9 @@ func TestReconcilerConcurrent(t *testing.T) {
err = childClient.Create(ctx, co)
tu.AssertNoError(t, err)
for i := 0; i < 50; i++ {
err = dummyClient.Apply(ctx, do)
_, err = dummyClient.Apply(ctx, do)
tu.AssertNoError(t, err)
err = childClient.Apply(ctx, co)
_, err = childClient.Apply(ctx, co)
tu.AssertNoError(t, err)
}
}()
Expand Down
12 changes: 10 additions & 2 deletions pkg/hz/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func ErrorFromNATS(msg *nats.Msg) error {
if err != nil {
return fmt.Errorf("invalid status header %q: %w", headerStatus, err)
}
if status == http.StatusOK {
if status >= http.StatusOK && status < http.StatusMultipleChoices {
return nil
}
return &Error{
Expand Down Expand Up @@ -143,13 +143,21 @@ func RespondError(
func RespondOK(
msg *nats.Msg,
body []byte,
) error {
return RespondStatus(msg, http.StatusOK, body)
}

func RespondStatus(
msg *nats.Msg,
status int,
body []byte,
) error {
if msg.Reply == "" {
return errors.New("no reply subject")
}
response := nats.NewMsg(msg.Reply)
response.Data = body
response.Header = make(nats.Header)
response.Header.Add(HeaderStatus, fmt.Sprintf("%d", http.StatusOK))
response.Header.Add(HeaderStatus, fmt.Sprintf("%d", status))
return msg.RespondMsg(response)
}
9 changes: 8 additions & 1 deletion pkg/hz/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,16 @@ func (e *PortalHandler) Start(ctx context.Context) error {
)
extClient := ObjectClient[Portal]{Client: client}
// TODO: field manager.
if err := extClient.Apply(ctx, e.ext); err != nil {
applyResult, err := extClient.Apply(ctx, e.ext)
if err != nil {
return fmt.Errorf("putting extension: %w", err)
}
switch applyResult {
case ApplyOpResultCreated:
slog.Info("created portal", "name", e.ext.Name)
case ApplyOpResultUpdated:
slog.Info("updated portal", "name", e.ext.Name)
}

// Subscribe to nats to receive http requests and proxy them to the handler.
subject := fmt.Sprintf(SubjectPortalRender, e.ext.Name)
Expand Down
28 changes: 21 additions & 7 deletions pkg/hzctl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,21 @@ type applyOptions struct {
data []byte
}

func (c *Client) Apply(ctx context.Context, opts ...ApplyOption) error {
func (c *Client) Apply(
ctx context.Context,
opts ...ApplyOption,
) (hz.ApplyOpResult, error) {
ao := applyOptions{}
for _, o := range opts {
o(&ao)
}

reqURL, err := url.JoinPath(c.Server, "v1", "objects")
if err != nil {
return fmt.Errorf("creating request url: %w", err)
return hz.ApplyOpResultError, fmt.Errorf(
"creating request url: %w",
err,
)
}

req, err := http.NewRequest(
Expand All @@ -140,7 +146,7 @@ func (c *Client) Apply(ctx context.Context, opts ...ApplyOption) error {
bytes.NewReader(ao.data),
)
if err != nil {
return fmt.Errorf("creating request: %w", err)
return hz.ApplyOpResultError, fmt.Errorf("creating request: %w", err)
}

req.Header.Add("Content-Type", "application/json")
Expand All @@ -149,14 +155,22 @@ func (c *Client) Apply(ctx context.Context, opts ...ApplyOption) error {

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("executing request: %w", err)
return hz.ApplyOpResultError, fmt.Errorf("executing request: %w", err)
}
defer resp.Body.Close()

if err := hz.ErrorFromHTTP(resp); err != nil {
return err
switch resp.StatusCode {
case http.StatusCreated:
return hz.ApplyOpResultCreated, nil
case http.StatusOK:
return hz.ApplyOpResultUpdated, nil
case http.StatusNotModified:
return hz.ApplyOpResultNoop, nil
case http.StatusConflict:
return hz.ApplyOpResultConflict, hz.ErrorFromHTTP(resp)
default:
return hz.ApplyOpResultError, hz.ErrorFromHTTP(resp)
}
return nil
}

type DeleteOption func(*deleteOptions)
Expand Down
Loading

0 comments on commit c966403

Please sign in to comment.