Skip to content

Commit

Permalink
SKS-3001 & SKS-2691: Support inplace-update and cpu and memory expans…
Browse files Browse the repository at this point in the history
…ion (#185)
  • Loading branch information
huaqing1994 authored Nov 12, 2024
1 parent 77cf7ec commit 831b562
Show file tree
Hide file tree
Showing 30 changed files with 779 additions and 1,351 deletions.
20 changes: 17 additions & 3 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ const (
// ResourcesHotUpdatedCondition documents the status of the hot updating resources of a VM.
ResourcesHotUpdatedCondition = "ResourceHotUpdated"

// WaitingForResourcesHotUpdateReason (Severity=Info) documents an ElfMachine waiting for updating resources.
WaitingForResourcesHotUpdateReason = "WaitingForResourcesHotUpdate"

// ExpandingVMDiskReason documents (Severity=Info) ElfMachine currently executing the expand disk operation.
ExpandingVMDiskReason = "ExpandingVMDisk"

Expand All @@ -144,6 +141,23 @@ const (
// detecting an error while adding new disk capacity to root directory; those kind of errors are
// usually transient and failed updating are automatically re-tried by the controller.
ExpandingRootPartitionFailedReason = "ExpandingRootPartitionFailed"

// ExpandingVMComputeResourcesReason documents (Severity=Info) ElfMachine currently executing the
// expand resources(CPU/memory) operation.
ExpandingVMComputeResourcesReason = "ExpandingVMComputeResources"

// ExpandingVMComputeResourcesFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// an error while expanding resources(CPU/memory); those kind of errors are usually transient and
// failed updating are automatically re-tried by the controller.
ExpandingVMComputeResourcesFailedReason = "ExpandingVMComputeResourcesFailed"

// RestartingKubeletReason documents (Severity=Info) ElfMachine currently executing the restart kubelet operation.
RestartingKubeletReason = "RestartingKubelet"

// RestartingKubeletFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// an error while restarting kubelet; those kind of errors are usually transient and failed restarting
// are automatically re-tried by the controller.
RestartingKubeletFailedReason = "RestartingKubeletFailed"
)

// Conditions and Reasons related to make connections to a Tower. Can currently be used by ElfCluster and ElfMachine
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta1/elfmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"time"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capierrors "sigs.k8s.io/cluster-api/errors"
Expand Down Expand Up @@ -259,6 +260,14 @@ func (m *ElfMachine) IsHotUpdating() bool {
return false
}

// IsResourcesUpToDate returns whether the machine resources are up to date.
func (m *ElfMachine) IsResourcesUpToDate() bool {
specMemory := *resource.NewQuantity(m.Spec.MemoryMiB*1024*1024, resource.BinarySI)
return m.Spec.DiskGiB == m.Status.Resources.Disk &&
m.Spec.NumCPUs == m.Status.Resources.CPUCores &&
specMemory.Equal(m.Status.Resources.Memory)
}

func (m *ElfMachine) SetVMDisconnectionTimestamp(timestamp *metav1.Time) {
if m.Annotations == nil {
m.Annotations = make(map[string]string)
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

// CloneMode is the type of clone operation used to clone a VM from a template.
Expand Down Expand Up @@ -199,6 +200,10 @@ type GPUStatus struct {
// ResourcesStatus records the resources allocated to the virtual machine.
type ResourcesStatus struct {
Disk int32 `json:"disk,omitempty"`
// CPUCores is the total number of CPU cores allocated for the virtual machine.
CPUCores int32 `json:"cpu,omitempty"`
// Memory is the total number of memory in MiB allocated for the virtual machine.
Memory resource.Quantity `json:"memory,omitempty"`
}

//+kubebuilder:object:generate=false
Expand Down
3 changes: 2 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions config/crd/bases/infrastructure.cluster.x-k8s.io_elfmachines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,22 @@ spec:
resources:
description: Resources records the resources allocated for the machine.
properties:
cpu:
description: CPUCores is the total number of CPU cores allocated
for the virtual machine.
format: int32
type: integer
disk:
format: int32
type: integer
memory:
anyOf:
- type: integer
- type: string
description: Memory is the total number of memory in MiB allocated
for the virtual machine.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
taskRef:
description: |-
Expand Down
26 changes: 0 additions & 26 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,29 +129,3 @@ rules:
- get
- patch
- update
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates/finalizers
verbs:
- update
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates/status
verbs:
- get
- patch
- update
11 changes: 8 additions & 3 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (r *ElfMachineReconciler) Reconcile(ctx goctx.Context, req ctrl.Request) (r
// always update the readyCondition.
conditions.SetSummary(machineCtx.ElfMachine,
conditions.WithConditions(
infrav1.VMProvisionedCondition,
infrav1.ResourcesHotUpdatedCondition,
infrav1.VMProvisionedCondition,
infrav1.TowerAvailableCondition,
),
)
Expand Down Expand Up @@ -973,7 +973,7 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx goctx.Context, machineCtx *co
machineCtx.ElfMachine.SetVMFirstBootTimestamp(&now)
}

if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) {
releaseTicketForCreateVM(machineCtx.ElfMachine.Name)
recordElfClusterStorageInsufficient(machineCtx, false)
recordElfClusterMemoryInsufficient(machineCtx, false)
Expand Down Expand Up @@ -1024,7 +1024,12 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx goctx.Context, machineC
case service.IsUpdateVMDiskTask(task, machineCtx.ElfMachine.Name):
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMDiskReason || reason == infrav1.ExpandingVMDiskFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, errorMessage)
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
}
case service.IsUpdateVMTask(task) && conditions.IsFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition):
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMComputeResourcesReason || reason == infrav1.ExpandingVMComputeResourcesFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if machineCtx.ElfMachine.RequiresGPUDevices() {
Expand Down
128 changes: 99 additions & 29 deletions controllers/elfmachine_controller_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/smartxworks/cloudtower-go-sdk/v2/models"
agentv1 "github.com/smartxworks/host-config-agent-api/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capiremote "sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/conditions"
Expand All @@ -32,38 +33,26 @@ import (
"github.com/smartxworks/cluster-api-provider-elf/pkg/context"
"github.com/smartxworks/cluster-api-provider-elf/pkg/hostagent"
"github.com/smartxworks/cluster-api-provider-elf/pkg/service"
machineutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/machine"
)

func (r *ElfMachineReconciler) reconcileVMResources(ctx goctx.Context, machineCtx *context.MachineContext, vm *models.VM) (bool, error) {
log := ctrl.LoggerFrom(ctx)

hotUpdatedCondition := conditions.Get(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if hotUpdatedCondition != nil &&
hotUpdatedCondition.Reason == infrav1.WaitingForResourcesHotUpdateReason &&
hotUpdatedCondition.Message != "" {
log.Info("Waiting for hot updating resources", "message", hotUpdatedCondition.Message)

return false, nil
if ok, err := r.reconcileVMCPUAndMemory(ctx, machineCtx, vm); err != nil || !ok {
return ok, err
}

if ok, err := r.reconcieVMVolume(ctx, machineCtx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
if ok, err := r.restartKubelet(ctx, machineCtx); err != nil || !ok {
return ok, err
}

// Agent needs to wait for the node exists before it can run and execute commands.
if machineutil.IsUpdatingElfMachineResources(machineCtx.ElfMachine) &&
machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent expand vm root partition")

return false, nil
if ok, err := r.reconcieVMVolume(ctx, machineCtx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
return ok, err
}

if ok, err := r.expandVMRootPartition(ctx, machineCtx); err != nil || !ok {
return ok, err
}

if machineutil.IsUpdatingElfMachineResources(machineCtx.ElfMachine) {
if conditions.IsFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition) {
conditions.MarkTrue(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
}

Expand Down Expand Up @@ -148,8 +137,6 @@ func (r *ElfMachineReconciler) resizeVMVolume(ctx goctx.Context, machineCtx *con

// expandVMRootPartition adds new disk capacity to root partition.
func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
log := ctrl.LoggerFrom(ctx)

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
Expand All @@ -164,35 +151,118 @@ func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineC
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")
}

return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeExpandRootPartition)
}

// reconcileVMCPUAndMemory ensures that the vm CPU and memory are as expected.
func (r *ElfMachineReconciler) reconcileVMCPUAndMemory(ctx goctx.Context, machineCtx *context.MachineContext, vm *models.VM) (bool, error) {
machineCtx.ElfMachine.Status.Resources.CPUCores = *vm.Vcpu
machineCtx.ElfMachine.Status.Resources.Memory = *resource.NewQuantity(service.ByteToMiB(*vm.Memory)*1024*1024, resource.BinarySI)

if !(machineCtx.ElfMachine.Spec.NumCPUs > *vm.Vcpu ||
machineCtx.ElfMachine.Spec.MemoryMiB > service.ByteToMiB(*vm.Memory)) {
return true, nil
}

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" ||
(reason != infrav1.ExpandingVMComputeResourcesReason && reason != infrav1.ExpandingVMComputeResourcesFailedReason) {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesReason, clusterv1.ConditionSeverityInfo, "")

// Save the condition first, and then expand the resources capacity.
// This prevents the resources expansion from succeeding but failing to save the
// condition, causing ElfMachine to not record the condition.
return false, nil
}

log := ctrl.LoggerFrom(ctx)

if ok := acquireTicketForUpdatingVM(machineCtx.ElfMachine.Name); !ok {
log.V(1).Info(fmt.Sprintf("The VM operation reaches rate limit, skip updating VM %s CPU and memory", machineCtx.ElfMachine.Status.VMRef))

return false, nil
}

withTaskVM, err := machineCtx.VMService.UpdateVM(vm, machineCtx.ElfMachine)
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

return false, errors.Wrapf(err, "failed to trigger update CPU and memory for VM %s", *vm.Name)
}

machineCtx.ElfMachine.SetTask(*withTaskVM.TaskID)

log.Info("Waiting for the VM to be updated CPU and memory", "vmRef", machineCtx.ElfMachine.Status.VMRef, "taskRef", machineCtx.ElfMachine.Status.TaskRef)

return false, nil
}

func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
} else if reason != infrav1.ExpandingVMComputeResourcesReason &&
reason != infrav1.ExpandingVMComputeResourcesFailedReason &&
reason != infrav1.RestartingKubeletReason &&
reason != infrav1.RestartingKubeletFailedReason {
return true, nil
}

if reason != infrav1.RestartingKubeletFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletReason, clusterv1.ConditionSeverityInfo, "")
}

return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeRestartKubelet)
}

func (r *ElfMachineReconciler) reconcileHostJob(ctx goctx.Context, machineCtx *context.MachineContext, jobType hostagent.HostAgentJobType) (bool, error) {
log := ctrl.LoggerFrom(ctx)
failReason := ""
switch jobType {
case hostagent.HostAgentJobTypeExpandRootPartition:
failReason = infrav1.ExpandingRootPartitionFailedReason
case hostagent.HostAgentJobTypeRestartKubelet:
failReason = infrav1.RestartingKubeletFailedReason
}

// Agent needs to wait for the node exists before it can run and execute commands.
if machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent job", "jobType", jobType)

return false, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", r.Client, client.ObjectKey{Namespace: machineCtx.Cluster.Namespace, Name: machineCtx.Cluster.Name})
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, failReason, clusterv1.ConditionSeverityWarning, "failed to create kubeClient: "+err.Error())
return false, err
}

agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetExpandRootPartitionJobName(machineCtx.ElfMachine))
agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetJobName(machineCtx.ElfMachine, jobType))
if err != nil && !apierrors.IsNotFound(err) {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, failReason, clusterv1.ConditionSeverityWarning, "failed to get HostOperationJob: "+err.Error())
return false, err
}

if agentJob == nil {
agentJob, err = hostagent.ExpandRootPartition(ctx, kubeClient, machineCtx.ElfMachine)
if err != nil {
agentJob = hostagent.GenerateJob(machineCtx.ElfMachine, jobType)
if err = kubeClient.Create(ctx, agentJob); err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

log.Info("Waiting for expanding root partition", "hostAgentJob", agentJob.Name)
log.Info("Waiting for job to complete", "hostAgentJob", agentJob.Name)

return false, nil
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
log.Info("Expand root partition to root succeeded", "hostAgentJob", agentJob.Name)
log.Info("HostJob succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
log.Info("Expand root partition failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, failReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
log.Info("HostJob failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)

lastExecutionTime := agentJob.Status.LastExecutionTime
if lastExecutionTime == nil {
Expand All @@ -201,13 +271,13 @@ func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineC
// Three minutes after the job fails, delete the job and try again.
if time.Now().After(lastExecutionTime.Add(3 * time.Minute)) {
if err := kubeClient.Delete(ctx, agentJob); err != nil {
return false, errors.Wrapf(err, "failed to delete expand root partition job %s/%s for retry", agentJob.Namespace, agentJob.Name)
return false, errors.Wrapf(err, "failed to delete hostJob %s/%s for retry", agentJob.Namespace, agentJob.Name)
}
}

return false, nil
default:
log.Info("Waiting for expanding root partition job done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)
log.Info("Waiting for HostJob done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)

return false, nil
}
Expand Down
Loading

0 comments on commit 831b562

Please sign in to comment.