Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Adds a method to change config per subject #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *Client) IsRegistered(ctx context.Context, subject string, schema string

// nolint
// Error not possible here.
reqBody, _ := json.Marshal(&requestBody{Schema: schema})
reqBody, _ := json.Marshal(requestBody{Schema: schema})

rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("subjects/%s", subject), bytes.NewReader(reqBody))
if IsSchemaNotFound(err) || IsSchemaNotFound(err) {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (c *Client) RegisterNewSchema(ctx context.Context, subject string, avroSche

// nolint
// Error not possible here.
reqBody, _ := json.Marshal(&requestBody{Schema: avroSchema})
reqBody, _ := json.Marshal(requestBody{Schema: avroSchema})

rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("subjects/%s/versions", subject), bytes.NewReader(reqBody))
if err != nil {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *Client) GetConfig(ctx context.Context, subject string) (*Config, error)
func (c *Client) SetGlobalConfig(ctx context.Context, config Config) (*Config, error) {
// nolint
// Error not possible here.
reqBody, _ := json.Marshal(&config)
reqBody, _ := json.Marshal(config)

rawBody, err := c.execRequest(ctx, "PUT", "config", bytes.NewReader(reqBody))
if err != nil {
Expand All @@ -287,6 +287,25 @@ func (c *Client) SetGlobalConfig(ctx context.Context, config Config) (*Config, e
return &newConfig, nil
}

func (c *Client) SetSubjectConfig(ctx context.Context, subject string, config Config) (*Config, error) {
// nolint
// Error not possible here.
reqBody, _ := json.Marshal(config)

rawBody, err := c.execRequest(ctx, "PUT", fmt.Sprintf("config/%s", subject), bytes.NewReader(reqBody))
if err != nil {
return nil, err
}

var newConfig Config
err = json.Unmarshal(rawBody, &newConfig)
if err != nil {
return nil, fmt.Errorf("failed to decode the response: %s", err)
}

return &newConfig, nil
}

func (c *Client) deleteSchemaVersion(ctx context.Context, subject string, version string, permanent bool) (int, error) {
rawBody, err := c.execRequest(ctx, "DELETE", fmt.Sprintf("subjects/%s/versions/%s?permanent=%v", subject, version, permanent), nil)
if err != nil {
Expand Down Expand Up @@ -344,7 +363,7 @@ func (c *Client) SchemaCompatibleWith(ctx context.Context, schema string, subjec

// nolint
// Error not possible here.
reqBody, _ := json.Marshal(&requestBody{Schema: schema})
reqBody, _ := json.Marshal(requestBody{Schema: schema})

rawBody, err := c.execRequest(ctx, "POST", fmt.Sprintf("compatibility/subjects/%s/versions/%d", subject, version), bytes.NewReader(reqBody))
if err != nil {
Expand Down
46 changes: 46 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,49 @@ func Test_SetGlobalConfig_with_a_remote_error(t *testing.T) {
assert.Nil(t, config)
assert.EqualError(t, err, fmt.Sprintf("client: (PUT: %s/config) failed with error code 500: internal server error", ts.URL))
}

func Test_SetSubjectConfig_success(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "PUT", r.Method)
assert.Equal(t, "/config/my-subject", r.URL.String())

w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"compatibility": "BACKWARD"}`))
require.NoError(t, err)
}))
defer ts.Close()

client, err := NewClient(ts.URL)
require.NoError(t, err)

config, err := client.SetSubjectConfig(context.Background(), "my-subject", Config{
Compatibility: "BACKWARD",
})

assert.NoError(t, err)
assert.EqualValues(t, &Config{
Compatibility: "BACKWARD",
}, config)
}

func Test_SetSubjectConfig_remote_error(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnprocessableEntity)
_, err := w.Write([]byte(`{
"error_code": 500,
"message": "internal server error"
}`))
require.NoError(t, err)
}))
defer ts.Close()

client, err := NewClient(ts.URL)
require.NoError(t, err)

config, err := client.SetSubjectConfig(context.Background(), "my-subject", Config{
Compatibility: "BACKWARD",
})

assert.Nil(t, config)
assert.EqualError(t, err, fmt.Sprintf("client: (PUT: %s/config/my-subject) failed with error code 500: internal server error", ts.URL))
}