Skip to content

Commit

Permalink
feat: support proton cluster (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
ye11ow authored May 6, 2024
1 parent 3795b8c commit 04e8ad4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
19 changes: 18 additions & 1 deletion internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type TimeplusProviderModel struct {
Endpoint types.String `tfsdk:"endpoint"`
Workspace types.String `tfsdk:"workspace"`
ApiKey types.String `tfsdk:"api_key"`

// Ideally we should read this from stream definitions. However, there are 2 limitations
// 1. Proton cluster (e.g. replica = 3) doesn't allow stream with relicatoin_refactor equals other number (e.g. 2)
// 2. Proton get/list stream endpoint doesn't return relicatoin_refactor of the stream
// Thus, we currently define this `replicas` as a provider setting
Replicas types.Int64 `tfsdk:"replicas"`
}

func (p *TimeplusProvider) Metadata(ctx context.Context, _ provider.MetadataRequest, resp *provider.MetadataResponse) {
Expand Down Expand Up @@ -59,6 +65,11 @@ Use the navigation to the left to read about the available resources.`,
Required: true,
Sensitive: true,
},
"replicas": schema.Int64Attribute{
MarkdownDescription: "Number of Proton replicas",
Required: false,
Optional: true,
},
},
}
}
Expand All @@ -72,8 +83,14 @@ func (p *TimeplusProvider) Configure(ctx context.Context, req provider.Configure
return
}

var replicas *int
if !(data.Replicas.IsNull() || data.Replicas.IsUnknown()) {
valInt := int(*data.Replicas.ValueInt64Pointer())
replicas = &valInt
}

// Configuration values are now available.
client, err := timeplus.NewClient(data.Workspace.ValueString(), data.ApiKey.ValueString(), timeplus.ClientOptions{
client, err := timeplus.NewClient(data.Workspace.ValueString(), data.ApiKey.ValueString(), replicas, timeplus.ClientOptions{
BaseURL: data.Endpoint.ValueString(),
})
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions internal/timeplus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type resource interface {
type Client struct {
*http.Client

baseURL *url.URL
apiKey string
baseURL *url.URL
apiKey string
replicas *int
}

// optional configurations for the client
Expand All @@ -40,7 +41,7 @@ func DefaultOptions() ClientOptions {
}
}

func NewClient(workspaceID string, apiKey string, opts ClientOptions) (*Client, error) {
func NewClient(workspaceID string, apiKey string, replicas *int, opts ClientOptions) (*Client, error) {
ops := DefaultOptions()
ops.merge(opts)

Expand All @@ -51,9 +52,10 @@ func NewClient(workspaceID string, apiKey string, opts ClientOptions) (*Client,
baseURL = baseURL.JoinPath(workspaceID, "api", "v1beta2")

return &Client{
Client: http.DefaultClient,
baseURL: baseURL,
apiKey: apiKey,
Client: http.DefaultClient,
baseURL: baseURL,
apiKey: apiKey,
replicas: replicas,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/timeplus/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (Stream) resourcePath() string {
}

func (c *Client) CreateStream(s *Stream) error {
if c.replicas != nil {
s.ReplicationFactor = *c.replicas
}

if err := c.post(s); err != nil {
return err
}
Expand Down

0 comments on commit 04e8ad4

Please sign in to comment.