Skip to content

Commit

Permalink
feat(dws/cluster): the cluster resource supports scale-in nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhuanhong committed Oct 12, 2024
1 parent dd643b8 commit f382463
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 55 deletions.
4 changes: 4 additions & 0 deletions docs/resources/dws_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ The following arguments are supported:

* `lts_enable` - (Optional, Bool) Specified whether to enable LTS. The default value is **false**.

* `force_backup` - (Optional, Bool) Specified whether to automatically execute snapshot when shrinking the number of nodes.
The default value is **true**.
This parameter is required and available only when scaling-in the `number_of_node` parameter value.

<a name="DwsCluster_PublicIp"></a>
The `PublicIp` block supports:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func getClusterResourceFunc(cfg *config.Config, state *terraform.ResourceState)
// getDwsCluster: Query the DWS cluster.
getDwsClusterClient, err := cfg.NewServiceClient("dws", region)
if err != nil {
return nil, fmt.Errorf("error creating DWS Client: %s", err)
return nil, fmt.Errorf("error creating DWS client: %s", err)
}

getDwsClusterRespBody, err := dws.GetClusterInfoByClusterId(getDwsClusterClient, state.Primary.ID)
if err != nil {
return nil, fmt.Errorf("error retrieving DwsCluster: %s", err)
return nil, fmt.Errorf("error retrieving DWS cluster: %s", err)
}

return getDwsClusterRespBody, nil
Expand Down Expand Up @@ -84,6 +84,7 @@ func TestAccResourceCluster_basicV1(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "name", name),
resource.TestCheckResourceAttr(resourceName, "number_of_node", "3"),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.public_bind_type", dws.PublicBindTypeNotUse),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.eip_id", ""),
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
Expand All @@ -97,7 +98,7 @@ func TestAccResourceCluster_basicV1(t *testing.T) {
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "logical_cluster_enable"},
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "logical_cluster_enable", "force_backup"},
},
},
})
Expand Down Expand Up @@ -201,7 +202,7 @@ data "huaweicloud_availability_zones" "test" {}
resource "huaweicloud_dws_cluster" "test" {
name = "%[3]s"
node_type = "dwsk2.xlarge"
number_of_node = 6
number_of_node = 3
vpc_id = huaweicloud_vpc.test.id
network_id = huaweicloud_vpc_subnet.test.id
security_group_id = huaweicloud_networking_secgroup.test.id
Expand Down Expand Up @@ -293,6 +294,7 @@ func TestAccResourceCluster_basicV2(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "name", name),
resource.TestCheckResourceAttr(resourceName, "number_of_node", "3"),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.public_bind_type", dws.PublicBindTypeNotUse),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.eip_id", ""),
resource.TestCheckResourceAttr(resourceName, "elb.0.name", ""),
Expand All @@ -308,7 +310,7 @@ func TestAccResourceCluster_basicV2(t *testing.T) {
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "lts_enable",
"logical_cluster_enable", "elb_id"},
"logical_cluster_enable", "elb_id", "force_backup"},
},
},
})
Expand Down Expand Up @@ -433,7 +435,8 @@ func testAccDwsCluster_basicV2_step3(rName, password string) string {
resource "huaweicloud_dws_cluster" "testv2" {
name = "%[2]s"
node_type = "dwsk2.xlarge"
number_of_node = 6
number_of_node = 3
force_backup = false
vpc_id = huaweicloud_vpc.test.id
network_id = huaweicloud_vpc_subnet.test.id
security_group_id = huaweicloud_networking_secgroup.test.id
Expand Down
158 changes: 109 additions & 49 deletions huaweicloud/services/dws/resource_huaweicloud_dws_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const ClusterIdIllegalErrCode = "DWS.0001"
// @API DWS POST /v1/{project_id}/clusters/{cluster_id}/lts-logs/disable
// @API DWS POST /v2/{project_id}/clusters/{cluster_id}/eips/{eip_id}
// @API DWS DELETE /v2/{project_id}/clusters/{cluster_id}/eips/{eip_id}
// @API DWS POST /v1.0/{project_id}/clusters/{cluster_id}/cluster-shrink
func ResourceDwsCluster() *schema.Resource {
return &schema.Resource{
CreateContext: resourceDwsClusterCreate,
Expand Down Expand Up @@ -210,6 +211,12 @@ func ResourceDwsCluster() *schema.Resource {
Optional: true,
Description: `Specified whether to enable LTS.`,
},
"force_backup": {
Type: schema.TypeBool,
Optional: true,
Default: true,
Description: `Whether to automatically execute snapshot when shrinking the number of nodes.`,
},
"status": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -459,7 +466,7 @@ func resourceDwsClusterCreateV2(ctx context.Context, d *schema.ResourceData, met
)
createDwsClusterClient, err := cfg.NewServiceClient(createDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

createDwsClusterPath := createDwsClusterClient.Endpoint + createDwsClusterHttpUrl
Expand All @@ -473,7 +480,7 @@ func resourceDwsClusterCreateV2(ctx context.Context, d *schema.ResourceData, met
createDwsClusterOpt.JSONBody = utils.RemoveNil(buildCreateDwsClusterBodyParams(d, cfg))
createDwsClusterResp, err := createDwsClusterClient.Request("POST", createDwsClusterPath, &createDwsClusterOpt)
if err != nil {
return diag.Errorf("error creating DWS Cluster: %s", err)
return diag.Errorf("error creating DWS cluster: %s", err)
}

createDwsClusterRespBody, err := utils.FlattenResponse(createDwsClusterResp)
Expand All @@ -483,7 +490,7 @@ func resourceDwsClusterCreateV2(ctx context.Context, d *schema.ResourceData, met

id, err := jmespath.Search("cluster.id", createDwsClusterRespBody)
if err != nil {
return diag.Errorf("error creating DWS Cluster: ID is not found in API response")
return diag.Errorf("error creating DWS cluster: ID is not found in API response")
}
d.SetId(id.(string))

Expand Down Expand Up @@ -529,7 +536,7 @@ func resourceDwsClusterCreateV1(ctx context.Context, d *schema.ResourceData, met
)
createDwsClusterClient, err := cfg.NewServiceClient(createDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

createDwsClusterPath := createDwsClusterClient.Endpoint + createDwsClusterHttpUrl
Expand All @@ -543,7 +550,7 @@ func resourceDwsClusterCreateV1(ctx context.Context, d *schema.ResourceData, met
createDwsClusterOpt.JSONBody = utils.RemoveNil(buildCreateDwsClusterBodyParamsV1(d, cfg))
createDwsClusterResp, err := createDwsClusterClient.Request("POST", createDwsClusterPath, &createDwsClusterOpt)
if err != nil {
return diag.Errorf("error creating DWS Cluster: %s", err)
return diag.Errorf("error creating DWS cluster: %s", err)
}

createDwsClusterRespBody, err := utils.FlattenResponse(createDwsClusterResp)
Expand All @@ -553,7 +560,7 @@ func resourceDwsClusterCreateV1(ctx context.Context, d *schema.ResourceData, met

id, err := jmespath.Search("cluster.id", createDwsClusterRespBody)
if err != nil {
return diag.Errorf("error creating DWS Cluster: ID is not found in API response")
return diag.Errorf("error creating DWS cluster: ID is not found in API response")
}
d.SetId(id.(string))

Expand Down Expand Up @@ -735,16 +742,12 @@ func resourceDwsClusterRead(_ context.Context, d *schema.ResourceData, meta inte
// getDwsCluster: Query the DWS cluster.
getDwsClusterClient, err := cfg.NewServiceClient("dws", region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

getDwsClusterRespBody, err := GetClusterInfoByClusterId(getDwsClusterClient, d.Id())
if err != nil {
return common.CheckDeletedDiag(d, err, "error retrieving DWS Cluster")
}

if err != nil {
return diag.FromErr(err)
return common.CheckDeletedDiag(d, err, "error retrieving DWS cluster")
}

mErr = multierror.Append(
Expand Down Expand Up @@ -905,7 +908,7 @@ func resourceDwsClusterUpdate(ctx context.Context, d *schema.ResourceData, meta
expandInstanceStorageOpt.JSONBody = utils.RemoveNil(buildExpandInstanceStorageBodyParams(d))
_, err = clusterClient.Request("POST", expandInstanceStoragePath, &expandInstanceStorageOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
return diag.Errorf("error updating DWS cluster: %s", err)
}
err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
Expand Down Expand Up @@ -934,42 +937,18 @@ func resourceDwsClusterUpdate(ctx context.Context, d *schema.ResourceData, meta
resetPasswordOfClusterOpt.JSONBody = utils.RemoveNil(buildResetPasswordOfClusterBodyParams(d))
_, err = clusterClient.Request("POST", resetPasswordOfClusterPath, &resetPasswordOfClusterOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
return diag.Errorf("error updating DWS cluster: %s", err)
}

err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return diag.Errorf("error waiting for the DWS cluster (%s) update to complete: %s", clusterId, err)
}
}
scaleOutClusterChanges := []string{
"number_of_node",
}

if d.HasChanges(scaleOutClusterChanges...) {
// scaleOutCluster: Scale out DWS cluster
var (
scaleOutClusterHttpUrl = "v1.0/{project_id}/clusters/{cluster_id}/resize"
)

scaleOutClusterPath := clusterClient.Endpoint + scaleOutClusterHttpUrl
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{project_id}", clusterClient.ProjectID)
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{cluster_id}", d.Id())

scaleOutClusterOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
}

scaleOutClusterOpt.JSONBody = utils.RemoveNil(buildScaleOutClusterBodyParams(d))
_, err = clusterClient.Request("POST", scaleOutClusterPath, &scaleOutClusterOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
}

err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return diag.Errorf("error waiting for the DWS cluster (%s) update to complete: %s", clusterId, err)
if d.HasChange("number_of_node") {
if err := updateClusterNodes(ctx, clusterClient, d, clusterId); err != nil {
return diag.FromErr(err)
}
}

Expand Down Expand Up @@ -1054,16 +1033,97 @@ func buildResetPasswordOfClusterBodyParams(d *schema.ResourceData) map[string]in
return bodyParams
}

func buildScaleOutClusterBodyParams(d *schema.ResourceData) map[string]interface{} {
func updateClusterNodes(ctx context.Context, client *golangsdk.ServiceClient, d *schema.ResourceData, clusterId string) error {
oldValue, newValue := d.GetChange("number_of_node")
num := newValue.(int) - oldValue.(int)
nodeNum := newValue.(int) - oldValue.(int)
if nodeNum > 0 {
if err := scaleOutCluster(client, clusterId, nodeNum); err != nil {
return err
}
} else {
// If you call the query progress interface, when the task progress is 100%, it does not mean that the shrinking operation is completed.
// It is possible that the node is being deleted. Therefore, use the query details interface to determine whether
// the shrinking operation is completed.
if err := scaleInCluster(client, d, clusterId, -nodeNum); err != nil {
return err
}
}

bodyParams := map[string]interface{}{
"scale_out": map[string]interface{}{
"count": num,
if err := waitClusterTaskStateCompleted(ctx, client, d.Timeout(schema.TimeoutUpdate), clusterId); err != nil {

Check failure on line 1052 in huaweicloud/services/dws/resource_huaweicloud_dws_cluster.go

View workflow job for this annotation

GitHub Actions / build

undefined: waitClusterTaskStateCompleted

Check failure on line 1052 in huaweicloud/services/dws/resource_huaweicloud_dws_cluster.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: waitClusterTaskStateCompleted (typecheck)

Check failure on line 1052 in huaweicloud/services/dws/resource_huaweicloud_dws_cluster.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: waitClusterTaskStateCompleted) (typecheck)
return err
}

return nil
}

func scaleOutCluster(client *golangsdk.ServiceClient, clusterId string, scaleOutNodes int) error {
scaleOutClusterHttpUrl := "v1.0/{project_id}/clusters/{cluster_id}/resize"
scaleOutClusterPath := client.Endpoint + scaleOutClusterHttpUrl
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{project_id}", client.ProjectID)
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{cluster_id}", clusterId)

scaleOutClusterOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
JSONBody: map[string]interface{}{
"scale_out": map[string]interface{}{
"count": scaleOutNodes,
},
},
}
return bodyParams
_, err := client.Request("POST", scaleOutClusterPath, &scaleOutClusterOpt)
if err != nil {
return fmt.Errorf("error extending nodes of the DWS cluster (%s) : %s", clusterId, err)
}
return nil
}

func buildScaleInBodyParams(d *schema.ResourceData, shrinkNum int, datastoreType string) map[string]interface{} {
return map[string]interface{}{
"shrink_number": shrinkNum,
"force_backup": d.Get("force_backup"),
"type": datastoreType,
}
}

func scaleInCluster(client *golangsdk.ServiceClient, d *schema.ResourceData, clusterId string, shrinkNum int) error {
clusterResp, err := GetClusterInfoByClusterId(client, clusterId)
if err != nil {
return err
}

datastoreType := utils.PathSearch("cluster.datastore_type", clusterResp, "").(string)
if datastoreType == "" {
return fmt.Errorf("unable to get datastore type of the cluster (%s)", clusterId)
}

httpUrl := "v1.0/{project_id}/clusters/{cluster_id}/cluster-shrink"
scaleInPath := client.Endpoint + httpUrl
scaleInPath = strings.ReplaceAll(scaleInPath, "{project_id}", client.ProjectID)
scaleInPath = strings.ReplaceAll(scaleInPath, "{cluster_id}", d.Id())

opt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
JSONBody: utils.RemoveNil(buildScaleInBodyParams(d, shrinkNum, datastoreType)),
}
resp, err := client.Request("POST", scaleInPath, &opt)
if err != nil {
return err
}

respBody, err := utils.FlattenResponse(resp)
if err != nil {
return err
}

// In some cases, the status code is 200 when the scaling-in fails.
// If `job_id` is not empty, it means the interface is sent successfully.
jobId := utils.PathSearch("job_id", respBody, "").(string)
if jobId == "" {
return fmt.Errorf("error shrinking nodes of the DWS cluster (%s) : %s", clusterId, err)
}
return nil
}

func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
Expand All @@ -1077,7 +1137,7 @@ func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta
)
deleteDwsClusterClient, err := cfg.NewServiceClient(deleteDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

deleteDwsClusterPath := deleteDwsClusterClient.Endpoint + deleteDwsClusterHttpUrl
Expand All @@ -1092,7 +1152,7 @@ func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta
deleteDwsClusterOpt.JSONBody = utils.RemoveNil(buildDeleteDwsClusterBodyParams(d))
_, err = deleteDwsClusterClient.Request("DELETE", deleteDwsClusterPath, &deleteDwsClusterOpt)
if err != nil {
return diag.Errorf("error deleting DWS Cluster: %s", err)
return diag.Errorf("error deleting DWS cluster: %s", err)
}

err = deleteClusterWaitingForCompleted(ctx, d, deleteDwsClusterClient, d.Timeout(schema.TimeoutDelete))
Expand Down

0 comments on commit f382463

Please sign in to comment.