diff --git a/pkg/rke2/rke2.go b/pkg/rke2/rke2.go index 689e0d434ae..35a8cda9033 100644 --- a/pkg/rke2/rke2.go +++ b/pkg/rke2/rke2.go @@ -103,7 +103,7 @@ func Server(clx *cli.Context, cfg Config) error { } dataDir := clx.String("data-dir") cmds.ServerConfig.StartupHooks = append(cmds.ServerConfig.StartupHooks, - checkStaticManifests(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir), + reconcileStaticPods(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir), setNetworkPolicies(cisMode, defaultNamespaces), setClusterRoles(), restrictServiceAccounts(cisMode, defaultNamespaces), diff --git a/pkg/rke2/spw.go b/pkg/rke2/spw.go index 8c9f269037f..8bceec38dee 100644 --- a/pkg/rke2/spw.go +++ b/pkg/rke2/spw.go @@ -2,7 +2,6 @@ package rke2 import ( "context" - "encoding/json" "os" "path/filepath" "sync" @@ -11,6 +10,7 @@ import ( "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/pkg/errors" + "github.com/rancher/rke2/pkg/staticpod" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -22,11 +22,11 @@ type containerInfo struct { Config *runtimeapi.ContainerConfig `json:"config,omitempty"` } -// checkStaticManifests validates that the pods started with rke2 match the static manifests -// provided in /var/lib/rancher/rke2/agent/pod-manifests. When restarting rke2, it takes time -// for any changes to static manifests to be pulled by kubelet. Additionally this prevents errors -// where something is wrong with the static manifests and RKE2 starts anyways. -func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.StartupHook { +// reconcileStaticPods validates that the running pods for etcd and kube-apiserver match the static pod +// manifests provided in /var/lib/rancher/rke2/agent/pod-manifests. If any old pods are found, they are +// manually terminated, as the kubelet cannot be relied upon to terminate old pod when the apiserver is +// not available. +func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupHook { return func(ctx context.Context, wg *sync.WaitGroup, args cmds.StartupHookArgs) error { go func() { defer wg.Done() @@ -51,22 +51,22 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup // Since split-role servers exist, we don't care if no manifest is found continue } - logrus.Infof("Container for %s not found (%v), retrying", pod, err) + logrus.Infof("Pod for %s not synced (%v), retrying", pod, err) return false, nil } - logrus.Infof("Container for %s is running", pod) + logrus.Infof("Pod for %s is synced", pod) } return true, nil }); err != nil { - logrus.Fatalf("Failed waiting for static pods to deploy: %v", err) + logrus.Fatalf("Failed waiting for static pods to sync: %v", err) } }() return nil } } -// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and -// verified as present and running with the current pod hash in the container runtime. +// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present +// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned. func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error { f, err := os.Open(manifestFile) if err != nil { @@ -81,43 +81,61 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi return errors.Wrap(err, "failed to decode manifest") } - var podHash string - for _, env := range podManifest.Spec.Containers[0].Env { - if env.Name == "POD_HASH" { - podHash = env.Value - break - } - } - - filter := &runtimeapi.ContainerFilter{ - State: &runtimeapi.ContainerStateValue{ - State: runtimeapi.ContainerState_CONTAINER_RUNNING, - }, + filter := &runtimeapi.PodSandboxFilter{ LabelSelector: map[string]string{ - "io.kubernetes.pod.uid": string(podManifest.UID), + "component": podManifest.Labels["component"], + "io.kubernetes.pod.namespace": podManifest.Namespace, + "tier": podManifest.Labels["tier"], }, } - - resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter}) + resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter}) if err != nil { - return errors.Wrap(err, "failed to list containers") + return errors.Wrap(err, "failed to list pods") } - for _, container := range resp.Containers { - resp, err := cRuntime.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ContainerId: container.Id, Verbose: true}) - if err != nil { - return errors.Wrap(err, "failed to get container status") - } - info := &containerInfo{} - err = json.Unmarshal([]byte(resp.Info["info"]), &info) - if err != nil || info.Config == nil { - return errors.Wrap(err, "failed to unmarshal container config") + var currentPod, stalePod bool + for _, pod := range resp.Items { + if pod.Annotations["kubernetes.io/config.source"] != "file" { + continue } - for _, env := range info.Config.Envs { - if env.Key == "POD_HASH" && env.Value == podHash { - return nil + if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) { + // Only mark the pod with matching UID as current if it is actually ready + currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY + } else { + stalePod = true + + // Stop containers with 10 second timeout before removing the pod. This sends the + // containers SIGTERM, and then SIGKILL 10 seconds later. Removing the pod directly just + // sends the containers SIGKILL immediately which allows no time for orderly shutdown. + cfilter := &runtimeapi.ContainerFilter{ + LabelSelector: map[string]string{ + "io.kubernetes.pod.uid": pod.Labels["io.kubernetes.pod.uid"], + }, + } + cresp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: cfilter}) + if err != nil { + logrus.Warnf("Failed to list containers for pod %s: %v", pod.Metadata.Name, err) + } else { + for _, container := range cresp.Containers { + logrus.Infof("Stopping %s container for previous %s pod", container.Metadata.Name, pod.Metadata.Name) + if _, err := cRuntime.StopContainer(ctx, &runtimeapi.StopContainerRequest{ContainerId: container.Id, Timeout: *staticpod.DefaultTerminationGracePeriodSeconds}); err != nil { + logrus.Warnf("Failed to stop container %s for pod %s: %v", container.Metadata.Name, pod.Metadata.Name, err) + } + } + } + + logrus.Infof("Deleting previous %s pod", pod.Metadata.Name) + if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil { + logrus.Warnf("Failed to remove old pod %s: %v", pod.Metadata.Name, err) } } } - return errors.New("no matching container found") + + if stalePod { + return errors.New("waiting for termination of old pod") + } + if !currentPod { + return errors.New("no current running pod found") + } + return nil } diff --git a/pkg/staticpod/staticpod.go b/pkg/staticpod/staticpod.go index 52a39fabe07..9c12cc8dc40 100644 --- a/pkg/staticpod/staticpod.go +++ b/pkg/staticpod/staticpod.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/utils/pointer" "sigs.k8s.io/yaml" ) @@ -30,6 +31,10 @@ const ( extraMountPrefix = "extra-mount" ) +var ( + DefaultTerminationGracePeriodSeconds = pointer.Int64(10) +) + type ProbeConf struct { InitialDelaySeconds int32 TimeoutSeconds int32 @@ -94,22 +99,16 @@ func Run(dir string, args Args) error { manifestPath := filepath.Join(dir, args.Command+".yaml") - // Generate a stable UID based on the manifest path. This allows the kubelet to reconcile the pod's - // containers even when the apiserver is unavailable. If the UID is not stable, the kubelet - // will consider the manifest change as two separate add/remove operations, and may start the new pod - // before terminating the old one. Cleanup of removed pods is disabled until all sources have synced, - // so if the apiserver is down, the newly added pod may get stuck in a crash loop due to the old pod - // still using its ports. See https://github.com/rancher/rke2/issues/3387 + // We hash the completed pod manifest use that as the UID; this mimics what upstream does: + // https://github.com/kubernetes/kubernetes/blob/v1.24.0/pkg/kubelet/config/common.go#L58-68 + // We manually terminate static pods with incorrect UIDs, as the kubelet cannot be relied + // upon to clean up the old one while the apiserver is down. + // See https://github.com/rancher/rke2/issues/3387 and https://github.com/rancher/rke2/issues/3725 hasher := md5.New() - fmt.Fprint(hasher, manifestPath) - pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:])) - - // Append a hash of the completed pod manifest to the container environment for later use when checking - // to see if the pod has been updated. It's fine that setting this changes the actual hash; we - // just need a stable values that we can compare between the file on disk and the running - // container to see if the kubelet has reconciled yet. hash.DeepHashObject(hasher, pod) - pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{Name: "POD_HASH", Value: hex.EncodeToString(hasher.Sum(nil)[0:])}) + fmt.Fprintf(hasher, "host:%s", os.Getenv("NODE_NAME")) + fmt.Fprintf(hasher, "file:%s", manifestPath) + pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:])) b, err := yaml.Marshal(pod) if err != nil { @@ -213,6 +212,8 @@ func pod(args Args) (*v1.Pod, error) { HostNetwork: true, PriorityClassName: "system-cluster-critical", SecurityContext: args.SecurityContext, + + TerminationGracePeriodSeconds: DefaultTerminationGracePeriodSeconds, }, }