Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve node group tracking #388

Merged
merged 12 commits into from
Jun 11, 2024
4 changes: 4 additions & 0 deletions hack/setup-capo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.

# XXX(mnaser): This is a workaround for when Cluster API is tagged but not
# released yet.
export GOPROXY=off

# Versions to test
CAPI_VERSION=${CAPI_VERSION:-v1.6.0}
CAPO_VERSION=${CAPO_VERSION:-v0.9.0}
Expand Down
251 changes: 147 additions & 104 deletions magnum_cluster_api/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@
from magnum.conductor import scale_manager
from magnum.drivers.common import driver
from magnum.objects import fields
from tenacity import (
Retrying,
retry_if_exception,
retry_if_not_result,
retry_unless_exception_type,
stop_after_delay,
wait_fixed,
)

from magnum_cluster_api import (
clients,
Expand Down Expand Up @@ -56,6 +48,12 @@ def __init__(self):
def create_cluster(
self, context, cluster: magnum_objects.Cluster, cluster_create_timeout: int
):
"""
Create cluster.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
# NOTE(mnaser): We want to set the `stack_id` as early as possible to
# make sure we can use it in the cluster creation.
cluster.stack_id = utils.generate_cluster_api_name(self.k8s_api)
Expand Down Expand Up @@ -123,6 +121,38 @@ def _get_cluster_status_reason(self, capi_cluster):
capi_ops_cluster_status_reason,
)

def update_cluster_control_plane_status(
self,
context,
cluster: magnum_objects.Cluster,
):
nodegroup = cluster.default_ng_master
action = nodegroup.status.split("_")[0]

kcp = resources.get_kubeadm_control_plane(self.k8s_api, cluster)
if kcp is None:
return nodegroup

generation = kcp.obj.get("status", {}).get("observedGeneration", 1)
if generation > 1:
action = "UPDATE"

ready = kcp.obj["status"].get("ready", False)
failure_message = kcp.obj["status"].get("failureMessage")

updated_replicas = kcp.obj["status"].get("updatedReplicas")
replicas = kcp.obj["status"].get("replicas")

if updated_replicas != replicas:
nodegroup.status = f"{action}_IN_PROGRESS"
elif updated_replicas == replicas and ready:
nodegroup.status = f"{action}_COMPLETE"
nodegroup.status_reason = failure_message

nodegroup.save()

return nodegroup

@cluster_lock_wrapper
def update_cluster_status(
self, context, cluster: magnum_objects.Cluster, use_admin_ctx: bool = False
Expand All @@ -131,11 +161,10 @@ def update_cluster_status(
# need to refresh it to make sure we have the latest data.
cluster.refresh()

node_groups = [
self.update_nodegroup_status(context, cluster, node_group)
for node_group in cluster.nodegroups
]
# TODO: watch for topology change instead
node_groups = [
self.update_cluster_control_plane_status(context, cluster)
] + self.update_nodegroups_status(context, cluster)
osc = clients.get_openstack_api(context)

capi_cluster = resources.Cluster(context, self.k8s_api, cluster).get_or_none()
Expand Down Expand Up @@ -234,6 +263,12 @@ def update_cluster(
scale_manager=None,
rollback=False,
):
"""
Update cluster.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
raise NotImplementedError()

@cluster_lock_wrapper
Expand All @@ -246,6 +281,16 @@ def resize_cluster(
nodes_to_remove: list[str],
nodegroup: magnum_objects.NodeGroup = None,
):
"""
Resize cluster (primarily add or remove nodes).

The cluster object passed to this method is already not in `UPDATE_IN_PROGRESS`
state and the node group object passed to this method is in `UPDATE_IN_PROGRESS`
state and saved.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
utils.validate_cluster(context, cluster)

if nodes_to_remove:
Expand Down Expand Up @@ -291,6 +336,9 @@ def upgrade_cluster(
For now, upgrade cluster simply modifies the labels that are necessary for the
upgrade, nothing else. For the future, we can perhaps use the `update_cluster`
API.

This method is called synchonously by the Magnum API, therefore it will be blocking
the Magnum API, so it should be as fast as possible.
"""
need_to_wait = (
cluster.default_ng_master.image_id != cluster_template.image_id
Expand Down Expand Up @@ -328,6 +376,12 @@ def upgrade_cluster(

@cluster_lock_wrapper
def delete_cluster(self, context, cluster: magnum_objects.Cluster):
"""
Delete cluster.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
if cluster.stack_id is None:
return
# NOTE(mnaser): This should be removed when this is fixed:
Expand All @@ -348,83 +402,76 @@ def create_nodegroup(
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
"""
Create node group.

The cluster object passed to this method is already in `UPDATE_IN_PROGRESS` state
and the node group object passed to this method is in `CREATE_IN_PROGRESS` state.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
utils.validate_nodegroup(nodegroup, context)

cluster_resource = objects.Cluster.for_magnum_cluster(self.k8s_api, cluster)

cluster_resource.obj["spec"]["topology"]["workers"][
"machineDeployments"
].append(resources.mutate_machine_deployment(context, cluster, nodegroup))

utils.kube_apply_patch(cluster_resource)

for attempt in Retrying(
retry=retry_if_exception(exceptions.MachineDeploymentNotFound),
stop=stop_after_delay(10),
wait=wait_fixed(1),
):
with attempt:
objects.MachineDeployment.for_node_group(
self.k8s_api, cluster, nodegroup
)

nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
nodegroup.save()

cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.save()

def update_nodegroup_status(
self,
context,
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
action = nodegroup.status.split("_")[0]
def update_nodegroups_status(
self, context, cluster: magnum_objects.Cluster
) -> list[magnum_objects.NodeGroup]:
node_groups = []

if nodegroup.role == "master":
kcp = resources.get_kubeadm_control_plane(self.k8s_api, cluster)
if kcp is None:
return nodegroup
for node_group in cluster.nodegroups:
cluster_resource = objects.Cluster.for_magnum_cluster(self.k8s_api, cluster)

generation = kcp.obj.get("status", {}).get("observedGeneration", 1)
if generation > 1:
action = "UPDATE"
md = objects.MachineDeployment.for_node_group_or_none(
self.k8s_api, cluster, node_group
)
md_is_running = (
md is not None and md.obj.get("status", {}).get("phase") == "Running"
)

ready = kcp.obj["status"].get("ready", False)
failure_message = kcp.obj["status"].get("failureMessage")
# NOTE(mnaser): If the node group is in `CREATE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to be hit the `Running` phase
# before we can mark the node group as `CREATE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.CREATE_IN_PROGRESS
and md_is_running
):
node_group.status = fields.ClusterStatus.CREATE_COMPLETE
node_group.save()

updated_replicas = kcp.obj["status"].get("updatedReplicas")
replicas = kcp.obj["status"].get("replicas")
# NOTE(mnaser): If the cluster is in `UPDATE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to match the desired state
# from the `Cluster` resource and that it is in the `Running`
# phase before we can mark the node group as `UPDATE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.UPDATE_IN_PROGRESS
and md_is_running
and md.equals_spec(
cluster_resource.get_machine_deployment_spec(node_group.name)
)
):
node_group.status = fields.ClusterStatus.UPDATE_COMPLETE
node_group.save()

if updated_replicas != replicas:
nodegroup.status = f"{action}_IN_PROGRESS"
elif updated_replicas == replicas and ready:
nodegroup.status = f"{action}_COMPLETE"
nodegroup.status_reason = failure_message
else:
md = objects.MachineDeployment.for_node_group_or_none(
self.k8s_api, cluster, nodegroup
)
if md is None:
if action == "DELETE":
nodegroup.status = f"{action}_COMPLETE"
nodegroup.save()
return nodegroup
return nodegroup

phase = md.obj["status"]["phase"]

if phase in ("ScalingUp", "ScalingDown"):
nodegroup.status = f"{action}_IN_PROGRESS"
elif phase == "Running":
nodegroup.status = f"{action}_COMPLETE"
elif phase in ("Failed", "Unknown"):
nodegroup.status = f"{action}_FAILED"
# NOTE(mnaser): If the cluster is in `DELETE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to be deleted before we can
# mark the node group as `DELETE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.DELETE_IN_PROGRESS
and md is None
):
node_group.status = fields.ClusterStatus.DELETE_COMPLETE
node_group.save()

nodegroup.save()
node_groups.append(node_group)

return nodegroup
return node_groups

@cluster_lock_wrapper
def update_nodegroup(
Expand All @@ -433,6 +480,16 @@ def update_nodegroup(
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
"""
Update node group (primarily resize it)

This cluster object passed to this method is already in `UPDATE_IN_PROGRESS` state
and the node group object passed to this method is in `UPDATE_IN_PROGRESS` state
but it's not saved.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""
self._update_nodegroup(context, cluster, nodegroup)

def _update_nodegroup(
Expand All @@ -459,18 +516,6 @@ def _update_nodegroup(
cluster_resource.set_machine_deployment_spec(nodegroup.name, target_md_spec)
utils.kube_apply_patch(cluster_resource)

for attempt in Retrying(
retry=retry_if_not_result(lambda md: md.equals_spec(target_md_spec)),
stop=stop_after_delay(10),
wait=wait_fixed(1),
):
with attempt:
md = objects.MachineDeployment.for_node_group(
self.k8s_api, cluster, nodegroup
)
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(md)

nodegroup.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
nodegroup.save()

Expand All @@ -484,6 +529,23 @@ def delete_nodegroup(
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
"""
Delete node group.

The cluster resource that is passed to this method is already in `UPDATE_IN_PROGRESS`
however the node group object passed to this method is in `DELETE_IN_PROGRESS` state
but it's not saved.

This method is called asynchonously by the Magnum API, therefore it will not be
blocking the Magnum API.
"""

# NOTE(mnaser): We want to switch the node group to `DELETE_IN_PROGRESS` state
# as soon as possible to make sure that the Magnum API knows that
# the node group is being deleted.
nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.save()

cluster_resource = objects.Cluster.for_magnum_cluster(self.k8s_api, cluster)

try:
Expand All @@ -499,25 +561,6 @@ def delete_nodegroup(

utils.kube_apply_patch(cluster_resource)

try:
for attempt in Retrying(
retry=retry_unless_exception_type(exceptions.MachineDeploymentNotFound),
stop=stop_after_delay(10),
wait=wait_fixed(1),
):
with attempt:
objects.MachineDeployment.for_node_group(
self.k8s_api, cluster, nodegroup
)
except exceptions.MachineDeploymentNotFound:
pass

nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.save()

cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
cluster.save()

@cluster_lock_wrapper
def get_monitor(self, context, cluster: magnum_objects.Cluster):
return monitor.Monitor(context, cluster)
Expand Down
Loading
Loading