From 19705a5a99a11a6c58b7a659da77215d52fa447f Mon Sep 17 00:00:00 2001 From: Fabrice Vaillant Date: Thu, 16 Feb 2023 11:49:47 +0100 Subject: [PATCH] Adds a method to change config per subject --- client.go | 27 +++++++++++++++++++++++---- client_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 68f02be..7b7ae1f 100644 --- a/client.go +++ b/client.go @@ -168,7 +168,7 @@ func (c *Client) IsRegistered(ctx context.Context, subject string, schema string // nolint // Error not possible here. - reqBody, _ := json.Marshal(&requestBody{Schema: schema}) + reqBody, _ := json.Marshal(requestBody{Schema: schema}) rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("subjects/%s", subject), bytes.NewReader(reqBody)) if IsSchemaNotFound(err) || IsSchemaNotFound(err) { @@ -205,7 +205,7 @@ func (c *Client) RegisterNewSchema(ctx context.Context, subject string, avroSche // nolint // Error not possible here. - reqBody, _ := json.Marshal(&requestBody{Schema: avroSchema}) + reqBody, _ := json.Marshal(requestBody{Schema: avroSchema}) rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("subjects/%s/versions", subject), bytes.NewReader(reqBody)) if err != nil { @@ -271,7 +271,7 @@ func (c *Client) GetConfig(ctx context.Context, subject string) (*Config, error) func (c *Client) SetGlobalConfig(ctx context.Context, config Config) (*Config, error) { // nolint // Error not possible here. - reqBody, _ := json.Marshal(&config) + reqBody, _ := json.Marshal(config) rawBody, err := c.execRequest(ctx, "PUT", "config", bytes.NewReader(reqBody)) if err != nil { @@ -287,6 +287,25 @@ func (c *Client) SetGlobalConfig(ctx context.Context, config Config) (*Config, e return &newConfig, nil } +func (c *Client) SetSubjectConfig(ctx context.Context, subject string, config Config) (*Config, error) { + // nolint + // Error not possible here. + reqBody, _ := json.Marshal(config) + + rawBody, err := c.execRequest(ctx, "PUT", fmt.Sprintf("config/%s", subject), bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + + var newConfig Config + err = json.Unmarshal(rawBody, &newConfig) + if err != nil { + return nil, fmt.Errorf("failed to decode the response: %s", err) + } + + return &newConfig, nil +} + func (c *Client) deleteSchemaVersion(ctx context.Context, subject string, version string, permanent bool) (int, error) { rawBody, err := c.execRequest(ctx, "DELETE", fmt.Sprintf("subjects/%s/versions/%s?permanent=%v", subject, version, permanent), nil) if err != nil { @@ -344,7 +363,7 @@ func (c *Client) SchemaCompatibleWith(ctx context.Context, schema string, subjec // nolint // Error not possible here. - reqBody, _ := json.Marshal(&requestBody{Schema: schema}) + reqBody, _ := json.Marshal(requestBody{Schema: schema}) rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("compatibility/subjects/%s/versions/%d", subject, version), bytes.NewReader(reqBody)) if err != nil { diff --git a/client_test.go b/client_test.go index ab8cb84..6d3e8c9 100644 --- a/client_test.go +++ b/client_test.go @@ -797,3 +797,49 @@ func Test_SetGlobalConfig_with_a_remote_error(t *testing.T) { assert.Nil(t, config) assert.EqualError(t, err, fmt.Sprintf("client: (PUT: %s/config) failed with error code 500: internal server error", ts.URL)) } + +func Test_SetSubjectConfig_success(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "PUT", r.Method) + assert.Equal(t, "/config/my-subject", r.URL.String()) + + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"compatibility": "BACKWARD"}`)) + require.NoError(t, err) + })) + defer ts.Close() + + client, err := NewClient(ts.URL) + require.NoError(t, err) + + config, err := client.SetSubjectConfig(context.Background(), "my-subject", Config{ + Compatibility: "BACKWARD", + }) + + assert.NoError(t, err) + assert.EqualValues(t, &Config{ + Compatibility: "BACKWARD", + }, config) +} + +func Test_SetSubjectConfig_remote_error(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnprocessableEntity) + _, err := w.Write([]byte(`{ + "error_code": 500, + "message": "internal server error" + }`)) + require.NoError(t, err) + })) + defer ts.Close() + + client, err := NewClient(ts.URL) + require.NoError(t, err) + + config, err := client.SetSubjectConfig(context.Background(), "my-subject", Config{ + Compatibility: "BACKWARD", + }) + + assert.Nil(t, config) + assert.EqualError(t, err, fmt.Sprintf("client: (PUT: %s/config/my-subject) failed with error code 500: internal server error", ts.URL)) +}