Skip to content

Commit

Permalink
Merge pull request futurewei-cloud#89 from phudtran/delete-fix
Browse files Browse the repository at this point in the history
Fix Merak-Compute Delete
  • Loading branch information
cj-chung authored Aug 29, 2022
2 parents 15b13bf + fb1cb21 commit 51fc0ba
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 60 deletions.
6 changes: 3 additions & 3 deletions services/merak-compute/activities/vm_create_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"strings"

agent_pb "github.com/futurewei-cloud/merak/api/proto/v1/agent"
common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
constants "github.com/futurewei-cloud/merak/services/common"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -47,7 +47,7 @@ func VmCreate(ctx context.Context, vmID string) error {
client := agent_pb.NewMerakAgentServiceClient(conn)
logger.Info("Sending to agent at" + podIP)
port := agent_pb.InternalPortConfig{
OperationType: common_pb.OperationType_CREATE,
OperationType: commonPB.OperationType_CREATE,
Name: common.RedisClient.HGet(ctx, vmID, "name").Val(),
Vpcid: common.RedisClient.HGet(ctx, vmID, "vpc").Val(),
Tenantid: common.RedisClient.HGet(ctx, vmID, "tenantID").Val(),
Expand All @@ -74,7 +74,7 @@ func VmCreate(ctx context.Context, vmID string) error {
}

// Update DB with device information
if resp.ReturnCode == common_pb.ReturnCode_OK {
if resp.ReturnCode == commonPB.ReturnCode_OK {
ip := resp.Port.GetIp()
status := resp.Port.GetStatus()
deviceID := resp.Port.GetDeviceid()
Expand Down
22 changes: 13 additions & 9 deletions services/merak-compute/activities/vm_delete_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ import (
"strconv"
"strings"

agent_pb "github.com/futurewei-cloud/merak/api/proto/v1/agent"
common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
agentPB "github.com/futurewei-cloud/merak/api/proto/v1/agent"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
constants "github.com/futurewei-cloud/merak/services/common"

"github.com/futurewei-cloud/merak/services/merak-compute/common"
"go.temporal.io/sdk/activity"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// Deletes a VM given by the vmID
func VmDelete(ctx context.Context, vmID string) error {
func VmDelete(ctx context.Context, vmID string) (string, error) {
logger := activity.GetLogger(ctx)

podIP := common.RedisClient.HGet(ctx, vmID, "hostIP").Val()
Expand All @@ -40,11 +41,11 @@ func VmDelete(ctx context.Context, vmID string) error {
conn, err := grpc.Dial(agent_address.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Info("Failed to dial gRPC server address: "+agent_address.String(), err)
return err
return vmID, err
}
client := agent_pb.NewMerakAgentServiceClient(conn)
port := agent_pb.InternalPortConfig{
OperationType: common_pb.OperationType_DELETE,
client := agentPB.NewMerakAgentServiceClient(conn)
port := agentPB.InternalPortConfig{
OperationType: commonPB.OperationType_DELETE,
Name: common.RedisClient.HGet(ctx, vmID, "name").Val(),
Projectid: common.RedisClient.HGet(ctx, vmID, "projectID").Val(),
Deviceid: common.RedisClient.HGet(ctx, vmID, "deviceID").Val(),
Expand All @@ -54,7 +55,10 @@ func VmDelete(ctx context.Context, vmID string) error {
resp, err := client.PortHandler(ctx, &port)
if err != nil {
logger.Error("Unable delete vm ID " + podIP + "Reason: " + resp.GetReturnMessage() + "\n")
return err
return vmID, err
}
return nil
common.RedisClient.HDel(ctx, vmID) // VM Detail hashmap
common.RedisClient.SRem(ctx, constants.COMPUTE_REDIS_VM_SET, vmID) // Set of all VM IDs

return vmID, nil
}
10 changes: 5 additions & 5 deletions services/merak-compute/activities/vm_info_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"strconv"

common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
pb "github.com/futurewei-cloud/merak/api/proto/v1/compute"
constants "github.com/futurewei-cloud/merak/services/common"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
Expand All @@ -33,7 +33,7 @@ func VmInfo(ctx context.Context) (*pb.ReturnComputeMessage, error) {
logger.Error("Unable get VM IDs from redis", ids.Err())

return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
ReturnMessage: "Unable get node IDs from redis",
}, ids.Err()
}
Expand All @@ -53,17 +53,17 @@ func VmInfo(ctx context.Context) (*pb.ReturnComputeMessage, error) {
if err != nil {
logger.Error("Failed to convert status string to int!", err)
return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
ReturnMessage: "Failed to convert status string to int!",
Vms: vms,
}, err
}
vm.Status = common_pb.Status(status)
vm.Status = commonPB.Status(status)
vms = append(vms, &vm)
}

return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_OK,
ReturnCode: commonPB.ReturnCode_OK,
ReturnMessage: "Success!",
Vms: vms,
}, nil
Expand Down
34 changes: 17 additions & 17 deletions services/merak-compute/handler/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"log"
"strconv"

common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
pb "github.com/futurewei-cloud/merak/api/proto/v1/compute"
constants "github.com/futurewei-cloud/merak/services/common"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
Expand All @@ -38,7 +38,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
}

log.Println("Operation Create")
return_vms := []*pb.InternalVMInfo{}
returnVMs := []*pb.InternalVMInfo{}
// Add pods to DB
for n, pod := range in.Config.Pods {
if err := RedisClient.HSet(
Expand All @@ -51,7 +51,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
).Err(); err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable add pod to DB Hash Map",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
log.Println("Added pod " + pod.Name + " at address " + pod.ContainerIp)
Expand All @@ -62,7 +62,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
).Err(); err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to add pod to DB Hash Set",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}

Expand All @@ -79,7 +79,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
).Err(); err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to VM to DB Hash Set",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
if err := RedisClient.HSet(
Expand All @@ -101,28 +101,28 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
).Err(); err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable add VM to DB Hash Map",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
return_vm := pb.InternalVMInfo{
returnVM := pb.InternalVMInfo{
Id: vmID,
Name: "v" + suffix,
VpcId: vpc.VpcId,
Ip: "",
SecurityGroupId: in.Config.VmDeploy.Secgroups[0],
SubnetId: subnet.SubnetId,
DefaultGateway: subnet.SubnetGw,
Status: common_pb.Status(1),
Status: commonPB.Status(1),
}
return_vms = append(return_vms, &return_vm)
returnVMs = append(returnVMs, &returnVM)
// Store VM to Pod list
log.Println("Added VM " + vmID + " for vpc " + vpc.VpcId + " for subnet " + subnet.SubnetId + " vm number " + strconv.Itoa(k+1) + " of " + strconv.Itoa(int(subnet.NumberVms)))
if err := RedisClient.LPush(ctx, "l"+pod.Id, vmID).Err(); err != nil {
log.Println("Failed to add pod -> vm mapping " + vmID)
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable add VM to pod list",
ReturnCode: common_pb.ReturnCode_FAILED,
Vms: return_vms,
ReturnCode: commonPB.ReturnCode_FAILED,
Vms: returnVMs,
}, err
}
log.Println("Added pod -> vm mapping " + vmID)
Expand All @@ -134,9 +134,9 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
if vms.Err() != nil {
log.Println("Unable get node vmIDsList from redis", vms.Err())
return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
ReturnMessage: "Unable get node vmIDsList from redis",
Vms: return_vms,
Vms: returnVMs,
}, vms.Err()
}

Expand All @@ -152,16 +152,16 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
if err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to execute create workflow",
ReturnCode: common_pb.ReturnCode_FAILED,
Vms: return_vms,
ReturnCode: commonPB.ReturnCode_FAILED,
Vms: returnVMs,
}, err
}
log.Println("Started Create workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID())
}

return &pb.ReturnComputeMessage{
ReturnMessage: "Successfully started all create workflows!",
ReturnCode: common_pb.ReturnCode_OK,
Vms: return_vms,
ReturnCode: commonPB.ReturnCode_OK,
Vms: returnVMs,
}, nil
}
33 changes: 15 additions & 18 deletions services/merak-compute/handler/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"log"
"strconv"

common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
pb "github.com/futurewei-cloud/merak/api/proto/v1/compute"
constants "github.com/futurewei-cloud/merak/services/common"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
Expand All @@ -38,37 +38,37 @@ func caseDelete(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
}

log.Println("Operation Delete")
pod_list := RedisClient.SMembers(
podList := RedisClient.SMembers(
ctx,
constants.COMPUTE_REDIS_NODE_IP_SET,
)
if pod_list.Err() != nil {
log.Println("Unable get VM IDs from redis", pod_list.Err())
if podList.Err() != nil {
log.Println("Unable get VM IDs from redis", podList.Err())

return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
ReturnMessage: "Unable get node IDs from redis",
}, pod_list.Err()
}, podList.Err()
}
// Get list of all vms in pod
for n, pod_id := range pod_list.Val() {
vms := RedisClient.LRange(ctx, "l"+pod_id, 0, -1)
for n, podID := range podList.Val() {
vms := RedisClient.LRange(ctx, "l"+podID, 0, -1)
if vms.Err() != nil {
log.Println("Unable get node vmIDsList from redis", vms.Err())
return &pb.ReturnComputeMessage{
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
ReturnMessage: "Unable get node vmIDsList from redis",
}, vms.Err()
}
for _, vm_id := range vms.Val() {
for _, vmID := range vms.Val() {
if err := RedisClient.HSet(
ctx,
vm_id,
vmID,
"status", "3",
).Err(); err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to set VM status to deleting in DB Hash Map",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
}
Expand All @@ -78,21 +78,18 @@ func caseDelete(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu
RetryPolicy: retrypolicy,
}
log.Println("Executing VM Delete Workflow!")
we, err := TemporalClient.ExecuteWorkflow(context.Background(), workflowOptions, delete.Delete, vms.Val())
we, err := TemporalClient.ExecuteWorkflow(context.Background(), workflowOptions, delete.Delete, vms.Val(), podID)
if err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to execute delete workflow",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
log.Println("Started Delete workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID())
}

log.Println("Deleting all VMs from DB")
RedisClient.FlushAll(ctx)

return &pb.ReturnComputeMessage{
ReturnMessage: "Successfully started all delete workflows!",
ReturnCode: common_pb.ReturnCode_OK,
ReturnCode: commonPB.ReturnCode_OK,
}, nil
}
6 changes: 3 additions & 3 deletions services/merak-compute/handler/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"context"
"log"

common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common"
commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common"
pb "github.com/futurewei-cloud/merak/api/proto/v1/compute"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
"github.com/futurewei-cloud/merak/services/merak-compute/workflows/info"
Expand Down Expand Up @@ -51,7 +51,7 @@ func caseInfo(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Return
if err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: "Unable to execute info workflow",
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
}, err
}
log.Println("Started workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID())
Expand All @@ -61,7 +61,7 @@ func caseInfo(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Return
if err != nil {
return &pb.ReturnComputeMessage{
ReturnMessage: result.GetReturnMessage(),
ReturnCode: common_pb.ReturnCode_FAILED,
ReturnCode: commonPB.ReturnCode_FAILED,
Vms: result.GetVms(),
}, err
}
Expand Down
24 changes: 19 additions & 5 deletions services/merak-compute/workflows/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ Copyright(c) 2022 Futurewei Cloud
package delete

import (
"context"
"strings"

constants "github.com/futurewei-cloud/merak/services/common"
"github.com/futurewei-cloud/merak/services/merak-compute/activities"
"github.com/futurewei-cloud/merak/services/merak-compute/common"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

func Delete(ctx workflow.Context, vms []string) (err error) {
func Delete(ctx workflow.Context, vms []string, podID string) (err error) {
retrypolicy := &temporal.RetryPolicy{
InitialInterval: common.TEMPORAL_ACTIVITY_RETRY_INTERVAL,
BackoffCoefficient: common.TEMPORAL_ACTIVITY_BACKOFF,
Expand All @@ -43,14 +45,26 @@ func Delete(ctx workflow.Context, vms []string) (err error) {
futures = append(futures, future)
}
logger.Info("Started VmDelete workflows for vms" + strings.Join(vms, " "))
var vmID string
wfContext := context.Background()
for _, future := range futures {
err = future.Get(ctx, nil)
logger.Info("Activity completed!")
err = future.Get(ctx, &vmID)
if err != nil {
return
logger.Error("Failed to delete VM ID " + vmID)
return err
}

logger.Info("Deleted VM ID " + vmID)

// Delete Single VM from DB
common.RedisClient.LRem(wfContext, "l"+podID, 1, vmID)
}
logger.Info("All activities completed")
// All VMs on pod have been deleted.
// Delete all Pod/VM associations from DB
common.RedisClient.HDel(wfContext, podID)
common.RedisClient.SRem(wfContext, constants.COMPUTE_REDIS_NODE_IP_SET, podID)
common.RedisClient.Del(wfContext, "l"+podID)
logger.Info("All VMs for pod " + podID + " deleted!")
return nil

}

0 comments on commit 51fc0ba

Please sign in to comment.