diff --git a/services/merak-compute/activities/vm_create_activity.go b/services/merak-compute/activities/vm_create_activity.go index e6f16be8..5c68c20a 100644 --- a/services/merak-compute/activities/vm_create_activity.go +++ b/services/merak-compute/activities/vm_create_activity.go @@ -19,7 +19,7 @@ import ( "strings" agent_pb "github.com/futurewei-cloud/merak/api/proto/v1/agent" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" constants "github.com/futurewei-cloud/merak/services/common" "github.com/futurewei-cloud/merak/services/merak-compute/common" "go.temporal.io/sdk/activity" @@ -47,7 +47,7 @@ func VmCreate(ctx context.Context, vmID string) error { client := agent_pb.NewMerakAgentServiceClient(conn) logger.Info("Sending to agent at" + podIP) port := agent_pb.InternalPortConfig{ - OperationType: common_pb.OperationType_CREATE, + OperationType: commonPB.OperationType_CREATE, Name: common.RedisClient.HGet(ctx, vmID, "name").Val(), Vpcid: common.RedisClient.HGet(ctx, vmID, "vpc").Val(), Tenantid: common.RedisClient.HGet(ctx, vmID, "tenantID").Val(), @@ -74,7 +74,7 @@ func VmCreate(ctx context.Context, vmID string) error { } // Update DB with device information - if resp.ReturnCode == common_pb.ReturnCode_OK { + if resp.ReturnCode == commonPB.ReturnCode_OK { ip := resp.Port.GetIp() status := resp.Port.GetStatus() deviceID := resp.Port.GetDeviceid() diff --git a/services/merak-compute/activities/vm_delete_activity.go b/services/merak-compute/activities/vm_delete_activity.go index 525a17fb..ca71a1ef 100644 --- a/services/merak-compute/activities/vm_delete_activity.go +++ b/services/merak-compute/activities/vm_delete_activity.go @@ -18,9 +18,10 @@ import ( "strconv" "strings" - agent_pb "github.com/futurewei-cloud/merak/api/proto/v1/agent" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + agentPB "github.com/futurewei-cloud/merak/api/proto/v1/agent" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" constants "github.com/futurewei-cloud/merak/services/common" + "github.com/futurewei-cloud/merak/services/merak-compute/common" "go.temporal.io/sdk/activity" "google.golang.org/grpc" @@ -28,7 +29,7 @@ import ( ) // Deletes a VM given by the vmID -func VmDelete(ctx context.Context, vmID string) error { +func VmDelete(ctx context.Context, vmID string) (string, error) { logger := activity.GetLogger(ctx) podIP := common.RedisClient.HGet(ctx, vmID, "hostIP").Val() @@ -40,11 +41,11 @@ func VmDelete(ctx context.Context, vmID string) error { conn, err := grpc.Dial(agent_address.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logger.Info("Failed to dial gRPC server address: "+agent_address.String(), err) - return err + return vmID, err } - client := agent_pb.NewMerakAgentServiceClient(conn) - port := agent_pb.InternalPortConfig{ - OperationType: common_pb.OperationType_DELETE, + client := agentPB.NewMerakAgentServiceClient(conn) + port := agentPB.InternalPortConfig{ + OperationType: commonPB.OperationType_DELETE, Name: common.RedisClient.HGet(ctx, vmID, "name").Val(), Projectid: common.RedisClient.HGet(ctx, vmID, "projectID").Val(), Deviceid: common.RedisClient.HGet(ctx, vmID, "deviceID").Val(), @@ -54,7 +55,10 @@ func VmDelete(ctx context.Context, vmID string) error { resp, err := client.PortHandler(ctx, &port) if err != nil { logger.Error("Unable delete vm ID " + podIP + "Reason: " + resp.GetReturnMessage() + "\n") - return err + return vmID, err } - return nil + common.RedisClient.HDel(ctx, vmID) // VM Detail hashmap + common.RedisClient.SRem(ctx, constants.COMPUTE_REDIS_VM_SET, vmID) // Set of all VM IDs + + return vmID, nil } diff --git a/services/merak-compute/activities/vm_info_activity.go b/services/merak-compute/activities/vm_info_activity.go index 06adf583..ad69687c 100644 --- a/services/merak-compute/activities/vm_info_activity.go +++ b/services/merak-compute/activities/vm_info_activity.go @@ -17,7 +17,7 @@ import ( "context" "strconv" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" pb "github.com/futurewei-cloud/merak/api/proto/v1/compute" constants "github.com/futurewei-cloud/merak/services/common" "github.com/futurewei-cloud/merak/services/merak-compute/common" @@ -33,7 +33,7 @@ func VmInfo(ctx context.Context) (*pb.ReturnComputeMessage, error) { logger.Error("Unable get VM IDs from redis", ids.Err()) return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, ReturnMessage: "Unable get node IDs from redis", }, ids.Err() } @@ -53,17 +53,17 @@ func VmInfo(ctx context.Context) (*pb.ReturnComputeMessage, error) { if err != nil { logger.Error("Failed to convert status string to int!", err) return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, ReturnMessage: "Failed to convert status string to int!", Vms: vms, }, err } - vm.Status = common_pb.Status(status) + vm.Status = commonPB.Status(status) vms = append(vms, &vm) } return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_OK, + ReturnCode: commonPB.ReturnCode_OK, ReturnMessage: "Success!", Vms: vms, }, nil diff --git a/services/merak-compute/handler/create.go b/services/merak-compute/handler/create.go index e95deef9..a55ed29d 100644 --- a/services/merak-compute/handler/create.go +++ b/services/merak-compute/handler/create.go @@ -19,7 +19,7 @@ import ( "log" "strconv" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" pb "github.com/futurewei-cloud/merak/api/proto/v1/compute" constants "github.com/futurewei-cloud/merak/services/common" "github.com/futurewei-cloud/merak/services/merak-compute/common" @@ -38,7 +38,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu } log.Println("Operation Create") - return_vms := []*pb.InternalVMInfo{} + returnVMs := []*pb.InternalVMInfo{} // Add pods to DB for n, pod := range in.Config.Pods { if err := RedisClient.HSet( @@ -51,7 +51,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu ).Err(); err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable add pod to DB Hash Map", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } log.Println("Added pod " + pod.Name + " at address " + pod.ContainerIp) @@ -62,7 +62,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu ).Err(); err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to add pod to DB Hash Set", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } @@ -79,7 +79,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu ).Err(); err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to VM to DB Hash Set", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } if err := RedisClient.HSet( @@ -101,10 +101,10 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu ).Err(); err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable add VM to DB Hash Map", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } - return_vm := pb.InternalVMInfo{ + returnVM := pb.InternalVMInfo{ Id: vmID, Name: "v" + suffix, VpcId: vpc.VpcId, @@ -112,17 +112,17 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu SecurityGroupId: in.Config.VmDeploy.Secgroups[0], SubnetId: subnet.SubnetId, DefaultGateway: subnet.SubnetGw, - Status: common_pb.Status(1), + Status: commonPB.Status(1), } - return_vms = append(return_vms, &return_vm) + returnVMs = append(returnVMs, &returnVM) // Store VM to Pod list log.Println("Added VM " + vmID + " for vpc " + vpc.VpcId + " for subnet " + subnet.SubnetId + " vm number " + strconv.Itoa(k+1) + " of " + strconv.Itoa(int(subnet.NumberVms))) if err := RedisClient.LPush(ctx, "l"+pod.Id, vmID).Err(); err != nil { log.Println("Failed to add pod -> vm mapping " + vmID) return &pb.ReturnComputeMessage{ ReturnMessage: "Unable add VM to pod list", - ReturnCode: common_pb.ReturnCode_FAILED, - Vms: return_vms, + ReturnCode: commonPB.ReturnCode_FAILED, + Vms: returnVMs, }, err } log.Println("Added pod -> vm mapping " + vmID) @@ -134,9 +134,9 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu if vms.Err() != nil { log.Println("Unable get node vmIDsList from redis", vms.Err()) return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, ReturnMessage: "Unable get node vmIDsList from redis", - Vms: return_vms, + Vms: returnVMs, }, vms.Err() } @@ -152,8 +152,8 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu if err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to execute create workflow", - ReturnCode: common_pb.ReturnCode_FAILED, - Vms: return_vms, + ReturnCode: commonPB.ReturnCode_FAILED, + Vms: returnVMs, }, err } log.Println("Started Create workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID()) @@ -161,7 +161,7 @@ func caseCreate(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu return &pb.ReturnComputeMessage{ ReturnMessage: "Successfully started all create workflows!", - ReturnCode: common_pb.ReturnCode_OK, - Vms: return_vms, + ReturnCode: commonPB.ReturnCode_OK, + Vms: returnVMs, }, nil } diff --git a/services/merak-compute/handler/delete.go b/services/merak-compute/handler/delete.go index 8dc13830..56bdebf5 100644 --- a/services/merak-compute/handler/delete.go +++ b/services/merak-compute/handler/delete.go @@ -19,7 +19,7 @@ import ( "log" "strconv" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" pb "github.com/futurewei-cloud/merak/api/proto/v1/compute" constants "github.com/futurewei-cloud/merak/services/common" "github.com/futurewei-cloud/merak/services/merak-compute/common" @@ -38,37 +38,37 @@ func caseDelete(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu } log.Println("Operation Delete") - pod_list := RedisClient.SMembers( + podList := RedisClient.SMembers( ctx, constants.COMPUTE_REDIS_NODE_IP_SET, ) - if pod_list.Err() != nil { - log.Println("Unable get VM IDs from redis", pod_list.Err()) + if podList.Err() != nil { + log.Println("Unable get VM IDs from redis", podList.Err()) return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, ReturnMessage: "Unable get node IDs from redis", - }, pod_list.Err() + }, podList.Err() } // Get list of all vms in pod - for n, pod_id := range pod_list.Val() { - vms := RedisClient.LRange(ctx, "l"+pod_id, 0, -1) + for n, podID := range podList.Val() { + vms := RedisClient.LRange(ctx, "l"+podID, 0, -1) if vms.Err() != nil { log.Println("Unable get node vmIDsList from redis", vms.Err()) return &pb.ReturnComputeMessage{ - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, ReturnMessage: "Unable get node vmIDsList from redis", }, vms.Err() } - for _, vm_id := range vms.Val() { + for _, vmID := range vms.Val() { if err := RedisClient.HSet( ctx, - vm_id, + vmID, "status", "3", ).Err(); err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to set VM status to deleting in DB Hash Map", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } } @@ -78,21 +78,18 @@ func caseDelete(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Retu RetryPolicy: retrypolicy, } log.Println("Executing VM Delete Workflow!") - we, err := TemporalClient.ExecuteWorkflow(context.Background(), workflowOptions, delete.Delete, vms.Val()) + we, err := TemporalClient.ExecuteWorkflow(context.Background(), workflowOptions, delete.Delete, vms.Val(), podID) if err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to execute delete workflow", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } log.Println("Started Delete workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID()) } - log.Println("Deleting all VMs from DB") - RedisClient.FlushAll(ctx) - return &pb.ReturnComputeMessage{ ReturnMessage: "Successfully started all delete workflows!", - ReturnCode: common_pb.ReturnCode_OK, + ReturnCode: commonPB.ReturnCode_OK, }, nil } diff --git a/services/merak-compute/handler/info.go b/services/merak-compute/handler/info.go index 39d87498..effe48c5 100644 --- a/services/merak-compute/handler/info.go +++ b/services/merak-compute/handler/info.go @@ -18,7 +18,7 @@ import ( "context" "log" - common_pb "github.com/futurewei-cloud/merak/api/proto/v1/common" + commonPB "github.com/futurewei-cloud/merak/api/proto/v1/common" pb "github.com/futurewei-cloud/merak/api/proto/v1/compute" "github.com/futurewei-cloud/merak/services/merak-compute/common" "github.com/futurewei-cloud/merak/services/merak-compute/workflows/info" @@ -51,7 +51,7 @@ func caseInfo(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Return if err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: "Unable to execute info workflow", - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, }, err } log.Println("Started workflow WorkflowID "+we.GetID()+" RunID ", we.GetRunID()) @@ -61,7 +61,7 @@ func caseInfo(ctx context.Context, in *pb.InternalComputeConfigInfo) (*pb.Return if err != nil { return &pb.ReturnComputeMessage{ ReturnMessage: result.GetReturnMessage(), - ReturnCode: common_pb.ReturnCode_FAILED, + ReturnCode: commonPB.ReturnCode_FAILED, Vms: result.GetVms(), }, err } diff --git a/services/merak-compute/workflows/delete/delete.go b/services/merak-compute/workflows/delete/delete.go index d83bd68d..78aa9167 100644 --- a/services/merak-compute/workflows/delete/delete.go +++ b/services/merak-compute/workflows/delete/delete.go @@ -14,15 +14,17 @@ Copyright(c) 2022 Futurewei Cloud package delete import ( + "context" "strings" + constants "github.com/futurewei-cloud/merak/services/common" "github.com/futurewei-cloud/merak/services/merak-compute/activities" "github.com/futurewei-cloud/merak/services/merak-compute/common" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" ) -func Delete(ctx workflow.Context, vms []string) (err error) { +func Delete(ctx workflow.Context, vms []string, podID string) (err error) { retrypolicy := &temporal.RetryPolicy{ InitialInterval: common.TEMPORAL_ACTIVITY_RETRY_INTERVAL, BackoffCoefficient: common.TEMPORAL_ACTIVITY_BACKOFF, @@ -43,14 +45,26 @@ func Delete(ctx workflow.Context, vms []string) (err error) { futures = append(futures, future) } logger.Info("Started VmDelete workflows for vms" + strings.Join(vms, " ")) + var vmID string + wfContext := context.Background() for _, future := range futures { - err = future.Get(ctx, nil) - logger.Info("Activity completed!") + err = future.Get(ctx, &vmID) if err != nil { - return + logger.Error("Failed to delete VM ID " + vmID) + return err } + + logger.Info("Deleted VM ID " + vmID) + + // Delete Single VM from DB + common.RedisClient.LRem(wfContext, "l"+podID, 1, vmID) } - logger.Info("All activities completed") + // All VMs on pod have been deleted. + // Delete all Pod/VM associations from DB + common.RedisClient.HDel(wfContext, podID) + common.RedisClient.SRem(wfContext, constants.COMPUTE_REDIS_NODE_IP_SET, podID) + common.RedisClient.Del(wfContext, "l"+podID) + logger.Info("All VMs for pod " + podID + " deleted!") return nil }