diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 3c5c2d6..74c4441 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -32,6 +32,12 @@ type TimeplusProviderModel struct { Endpoint types.String `tfsdk:"endpoint"` Workspace types.String `tfsdk:"workspace"` ApiKey types.String `tfsdk:"api_key"` + + // Ideally we should read this from stream definitions. However, there are 2 limitations + // 1. Proton cluster (e.g. replica = 3) doesn't allow stream with relicatoin_refactor equals other number (e.g. 2) + // 2. Proton get/list stream endpoint doesn't return relicatoin_refactor of the stream + // Thus, we currently define this `replicas` as a provider setting + Replicas types.Int64 `tfsdk:"replicas"` } func (p *TimeplusProvider) Metadata(ctx context.Context, _ provider.MetadataRequest, resp *provider.MetadataResponse) { @@ -59,6 +65,11 @@ Use the navigation to the left to read about the available resources.`, Required: true, Sensitive: true, }, + "replicas": schema.Int64Attribute{ + MarkdownDescription: "Number of Proton replicas", + Required: false, + Optional: true, + }, }, } } @@ -72,8 +83,14 @@ func (p *TimeplusProvider) Configure(ctx context.Context, req provider.Configure return } + var replicas *int + if !(data.Replicas.IsNull() || data.Replicas.IsUnknown()) { + valInt := int(*data.Replicas.ValueInt64Pointer()) + replicas = &valInt + } + // Configuration values are now available. - client, err := timeplus.NewClient(data.Workspace.ValueString(), data.ApiKey.ValueString(), timeplus.ClientOptions{ + client, err := timeplus.NewClient(data.Workspace.ValueString(), data.ApiKey.ValueString(), replicas, timeplus.ClientOptions{ BaseURL: data.Endpoint.ValueString(), }) if err != nil { diff --git a/internal/timeplus/client.go b/internal/timeplus/client.go index 81be2b9..2cc7d97 100644 --- a/internal/timeplus/client.go +++ b/internal/timeplus/client.go @@ -19,8 +19,9 @@ type resource interface { type Client struct { *http.Client - baseURL *url.URL - apiKey string + baseURL *url.URL + apiKey string + replicas *int } // optional configurations for the client @@ -40,7 +41,7 @@ func DefaultOptions() ClientOptions { } } -func NewClient(workspaceID string, apiKey string, opts ClientOptions) (*Client, error) { +func NewClient(workspaceID string, apiKey string, replicas *int, opts ClientOptions) (*Client, error) { ops := DefaultOptions() ops.merge(opts) @@ -51,9 +52,10 @@ func NewClient(workspaceID string, apiKey string, opts ClientOptions) (*Client, baseURL = baseURL.JoinPath(workspaceID, "api", "v1beta2") return &Client{ - Client: http.DefaultClient, - baseURL: baseURL, - apiKey: apiKey, + Client: http.DefaultClient, + baseURL: baseURL, + apiKey: apiKey, + replicas: replicas, }, nil } diff --git a/internal/timeplus/stream.go b/internal/timeplus/stream.go index 4720422..234050e 100644 --- a/internal/timeplus/stream.go +++ b/internal/timeplus/stream.go @@ -84,6 +84,10 @@ func (Stream) resourcePath() string { } func (c *Client) CreateStream(s *Stream) error { + if c.replicas != nil { + s.ReplicationFactor = *c.replicas + } + if err := c.post(s); err != nil { return err }