diff --git a/pkg/rke2/rke2.go b/pkg/rke2/rke2.go index 689e0d434a..35a8cda903 100644 --- a/pkg/rke2/rke2.go +++ b/pkg/rke2/rke2.go @@ -103,7 +103,7 @@ func Server(clx *cli.Context, cfg Config) error { } dataDir := clx.String("data-dir") cmds.ServerConfig.StartupHooks = append(cmds.ServerConfig.StartupHooks, - checkStaticManifests(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir), + reconcileStaticPods(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir), setNetworkPolicies(cisMode, defaultNamespaces), setClusterRoles(), restrictServiceAccounts(cisMode, defaultNamespaces), diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 8c9f269037..610230dcad 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -2,7 +2,6 @@ package rke2 import ( "context" - "encoding/json" "os" "path/filepath" "sync" @@ -22,11 +21,11 @@ type containerInfo struct { Config *runtimeapi.ContainerConfig `json:"config,omitempty"` } -// checkStaticManifests validates that the pods started with rke2 match the static manifests -// provided in /var/lib/rancher/rke2/agent/pod-manifests. When restarting rke2, it takes time -// for any changes to static manifests to be pulled by kubelet. Additionally this prevents errors -// where something is wrong with the static manifests and RKE2 starts anyways. -func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.StartupHook { +// reconcileStaticPods validates that the running pods for etcd and kube-apiserver match the static pod +// manifests provided in /var/lib/rancher/rke2/agent/pod-manifests. If any old pods are found, they are +// manually terminated, as the kubelet cannot be relied upon to terminate old pod when the apiserver is +// not available. +func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupHook { return func(ctx context.Context, wg *sync.WaitGroup, args cmds.StartupHookArgs) error { go func() { defer wg.Done() @@ -51,22 +50,22 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup // Since split-role servers exist, we don't care if no manifest is found continue } - logrus.Infof("Container for %s not found (%v), retrying", pod, err) + logrus.Infof("Pod for %s not synced (%v), retrying", pod, err) return false, nil } - logrus.Infof("Container for %s is running", pod) + logrus.Infof("Pod for %s is synced", pod) } return true, nil }); err != nil { - logrus.Fatalf("Failed waiting for static pods to deploy: %v", err) + logrus.Fatalf("Failed waiting for static pods to sync: %v", err) } }() return nil } } -// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and -// verified as present and running with the current pod hash in the container runtime. +// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present +// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned. func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error { f, err := os.Open(manifestFile) if err != nil { @@ -81,43 +80,38 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi return errors.Wrap(err, "failed to decode manifest") } - var podHash string - for _, env := range podManifest.Spec.Containers[0].Env { - if env.Name == "POD_HASH" { - podHash = env.Value - break - } - } - - filter := &runtimeapi.ContainerFilter{ - State: &runtimeapi.ContainerStateValue{ - State: runtimeapi.ContainerState_CONTAINER_RUNNING, - }, + filter := &runtimeapi.PodSandboxFilter{ LabelSelector: map[string]string{ - "io.kubernetes.pod.uid": string(podManifest.UID), + "component": podManifest.Labels["component"], + "io.kubernetes.pod.namespace": podManifest.Namespace, + "tier": podManifest.Labels["tier"], }, } - - resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter}) + resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) if err != nil { - return errors.Wrap(err, "failed to list containers") + return errors.Wrap(err, "failed to list pods") } - for _, container := range resp.Containers { - resp, err := cRuntime.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ContainerId: container.Id, Verbose: true}) - if err != nil { - return errors.Wrap(err, "failed to get container status") + var currentPod, stalePod bool + for _, pod := range resp.Items { + if pod.Annotations["kubernetes.io/config.source"] != "file" { + continue } - info := &containerInfo{} - err = json.Unmarshal([]byte(resp.Info["info"]), &info) - if err != nil || info.Config == nil { - return errors.Wrap(err, "failed to unmarshal container config") - } - for _, env := range info.Config.Envs { - if env.Key == "POD_HASH" && env.Value == podHash { - return nil + if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) { + currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY + } else { + stalePod = true + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { + logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err) } } } - return errors.New("no matching container found") + + if stalePod { + return errors.New("waiting for termination of old pod") + } + if !currentPod { + return errors.New("no current running pod found") + } + return nil } diff --git a/pkg/staticpod/staticpod.go b/pkg/staticpod/staticpod.go index 52a39fabe0..a64b134862 100644 --- a/pkg/staticpod/staticpod.go +++ b/pkg/staticpod/staticpod.go @@ -94,22 +94,15 @@ func Run(dir string, args Args) error { manifestPath := filepath.Join(dir, args.Command+".yaml") - // Generate a stable UID based on the manifest path. This allows the kubelet to reconcile the pod's - // containers even when the apiserver is unavailable. If the UID is not stable, the kubelet - // will consider the manifest change as two separate add/remove operations, and may start the new pod - // before terminating the old one. Cleanup of removed pods is disabled until all sources have synced, - // so if the apiserver is down, the newly added pod may get stuck in a crash loop due to the old pod - // still using its ports. See https://github.com/rancher/rke2/issues/3387 + // We hash the completed pod manifest use that as the UID; this mimics what upstream does: + // https://github.com/kubernetes/kubernetes/blob/v1.24.0/pkg/kubelet/config/common.go#L58-68 + // We manually terminate static pods with incorrect UIDs, as the kubelet cannot be relied + // upon to clean up the old one while the apiserver is down. + // See https://github.com/rancher/rke2/issues/3387 and https://github.com/rancher/rke2/issues/3725 hasher := md5.New() - fmt.Fprint(hasher, manifestPath) - pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:])) - - // Append a hash of the completed pod manifest to the container environment for later use when checking - // to see if the pod has been updated. It's fine that setting this changes the actual hash; we - // just need a stable values that we can compare between the file on disk and the running - // container to see if the kubelet has reconciled yet. hash.DeepHashObject(hasher, pod) - pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{Name: "POD_HASH", Value: hex.EncodeToString(hasher.Sum(nil)[0:])}) + fmt.Fprintf(hasher, "file:%s", manifestPath) + pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:])) b, err := yaml.Marshal(pod) if err != nil {