Skip to content

Commit

Permalink
Update checking for a completed status of job containers
Browse files Browse the repository at this point in the history
Previously jobs were waited upon for an arbitrary minute before moving
on. Now the job is waited upon until it reaches the successed phase from
the successful exiting of the shell script, or the context is cancelled.
  • Loading branch information
s-fairchild committed Jul 7, 2023
1 parent fa0f972 commit 4d54acd
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 49 deletions.
22 changes: 22 additions & 0 deletions pkg/frontend/adminactions/kubeactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

Expand All @@ -37,6 +38,8 @@ type KubeActions interface {
ApproveAllCsrs(ctx context.Context) error
Upgrade(ctx context.Context, upgradeY bool) error
KubeGetPodLogs(ctx context.Context, namespace, name, containerName string) ([]byte, error)
// kubeWatch returns a watch object for the provided label selector key
KubeWatch(ctx context.Context, o *unstructured.Unstructured, label string) (watch.Interface, error)
}

type kubeActions struct {
Expand Down Expand Up @@ -149,6 +152,25 @@ func (k *kubeActions) KubeCreateOrUpdate(ctx context.Context, o *unstructured.Un
return err
}

func (k *kubeActions) KubeWatch(ctx context.Context, o *unstructured.Unstructured, labelKey string) (watch.Interface, error) {
gvr, err := k.gvrResolver.Resolve(o.GroupVersionKind().GroupKind().String(), o.GroupVersionKind().Version)
if err != nil {
return nil, err
}

listOpts := metav1.ListOptions{
Limit: 1000, // just in case
LabelSelector: o.GetLabels()[labelKey],
}

w, err := k.dyn.Resource(*gvr).Namespace(o.GetNamespace()).Watch(ctx, listOpts)
if err != nil {
return nil, err
}

return w, nil
}

func (k *kubeActions) KubeDelete(ctx context.Context, groupKind, namespace, name string, force bool, propagationPolicy *metav1.DeletionPropagation) error {
gvr, err := k.gvrResolver.Resolve(groupKind, "")
if err != nil {
Expand Down
69 changes: 35 additions & 34 deletions pkg/frontend/fixetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ import (
)

const (
serviceAccountName = "etcd-recovery-privileged"
kubeServiceAccount = "system:serviceaccount" + namespaceEtcds + ":" + serviceAccountName
namespaceEtcds = "openshift-etcd"
image = "ubi8/ubi-minimal"
jobName = "etcd-recovery-"
jobNameDataBackup = jobName + "data-backup"
jobNameFixPeers = jobName + "fix-peers"
patchOverides = "unsupportedConfigOverrides:"
patchDisableOverrides = `{"useUnsupportedUnsafeNonHANonProductionUnstableEtcd": true}`
ctxKey timeoutAdjust = "TESTING" // Context key used by unit tests to change job wait time
serviceAccountName = "etcd-recovery-privileged"
kubeServiceAccount = "system:serviceaccount" + namespaceEtcds + ":" + serviceAccountName
namespaceEtcds = "openshift-etcd"
image = "ubi8/ubi-minimal"
jobName = "etcd-recovery-"
jobNameDataBackup = jobName + "data-backup"
jobNameFixPeers = jobName + "fix-peers"
patchOverides = "unsupportedConfigOverrides:"
patchDisableOverrides = `{"useUnsupportedUnsafeNonHANonProductionUnstableEtcd": true}`
)

type timeoutAdjust string

type degradedEtcd struct {
Node string
Pod string
Expand Down Expand Up @@ -284,14 +281,21 @@ func fixPeers(ctx context.Context, log *logrus.Entry, de *degradedEtcd, pods *co
return err
}

timeout := adjustTimeout(ctx, log)
log.Infof("Waiting for %s", jobNameFixPeers)
watcher, err := kubeActions.KubeWatch(ctx, jobFixPeers, "app")
if err != nil {
return err
}

select {
case <-time.After(timeout):
log.Infof("Finished waiting for job %s", jobNameFixPeers)
break
case event := <-watcher.ResultChan():
log.Infof("Waiting for %s to reach %s phase", jobFixPeers.GetName(), corev1.PodSucceeded)
pod := event.Object.(*corev1.Pod)

if pod.Status.Phase == corev1.PodSucceeded {
log.Infof("Job %s completed with %s", pod.GetName(), corev1.PodSucceeded)
}
case <-ctx.Done():
log.Warnf("Request cancelled while waiting for %s", jobNameFixPeers)
log.Warnf("Context was cancelled while waiting for %s", jobNameDataBackup)
}

log.Infof("Deleting %s now", jobNameFixPeers)
Expand Down Expand Up @@ -480,13 +484,21 @@ func backupEtcdData(ctx context.Context, log *logrus.Entry, cluster, node string
}
log.Infof("Job %s has been created", jobNameDataBackup)

timeout := adjustTimeout(ctx, log)
log.Infof("Waiting for %s", jobNameDataBackup)
watcher, err := kubeActions.KubeWatch(ctx, jobDataBackup, "app")
if err != nil {
return err
}

select {
case <-time.After(timeout):
log.Infof("Finished waiting for job %s", jobNameDataBackup)
case event := <-watcher.ResultChan():
log.Infof("Waiting for %s to reach %s phase", jobDataBackup.GetName(), corev1.PodSucceeded)
pod := event.Object.(*corev1.Pod)

if pod.Status.Phase == corev1.PodSucceeded {
log.Infof("Job %s completed with %s", pod.GetName(), corev1.PodSucceeded)
}
case <-ctx.Done():
log.Warnf("Request cancelled while waiting for %s", jobNameDataBackup)
log.Warnf("Context was cancelled while waiting for %s", jobNameDataBackup)
}

log.Infof("Deleting job %s now", jobNameDataBackup)
Expand Down Expand Up @@ -575,16 +587,6 @@ func createBackupEtcdDataJob(cluster, node string) *unstructured.Unstructured {
return j
}

// adjustTimeout is used to adjust the amount of time when waiting for jobs to complete when in a testing environment
func adjustTimeout(ctx context.Context, log *logrus.Entry) time.Duration {
if ctx.Value(ctxKey) == "TRUE" {
log.Info("Timeout adjusted for testing environment")
return time.Microsecond
}

return time.Minute
}

// comparePodEnvToIp compares the etcd container's environment variables to the pod's actual IP address
func comparePodEnvToIP(log *logrus.Entry, pods *corev1.PodList) (*degradedEtcd, error) {
degradedEtcds := []degradedEtcd{}
Expand Down Expand Up @@ -657,7 +659,6 @@ func ipFromEnv(containers []corev1.Container, podName string) string {
return ""
}

// TODO make this work for non status checks too
func findCrashloopingPods(log *logrus.Entry, pods *corev1.PodList) (*corev1.Pod, error) {
// pods are collected in a list to check for multiple crashing etcd instances
// multiple etcd failures aren't supported so an error will be returned, rather than assuming the first found is the only one
Expand Down
82 changes: 67 additions & 15 deletions pkg/frontend/fixetcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
"errors"
"strings"
"testing"
"time"

"github.com/Azure/go-autorest/autorest/to"
operatorv1fake "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1/fake"
"github.com/ugorji/go/codec"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
ktesting "k8s.io/client-go/testing"

"github.com/Azure/ARO-RP/pkg/api"
Expand All @@ -26,9 +30,7 @@ import (
const degradedNode = "master-2"

func TestFixEtcd(t *testing.T) {
// TODO inject timeout duration
ctx := context.WithValue(context.Background(), ctxKey, "TRUE")

ctx := context.Background()
const (
mockSubID = "00000000-0000-0000-0000-000000000000"
mockTenantID = "00000000-0000-0000-0000-000000000000"
Expand Down Expand Up @@ -86,7 +88,11 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)

createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")

propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand All @@ -111,7 +117,9 @@ func TestFixEtcd(t *testing.T) {
t.Fatal(err)
}

k.EXPECT().KubeCreateOrUpdate(ctx, newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)).MaxTimes(1).Return(nil)
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobFixPeers, k, "app")
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameFixPeers, true, &propPolicy).MaxTimes(1).Return(nil)

// cleanup
Expand Down Expand Up @@ -144,7 +152,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand All @@ -169,7 +179,9 @@ func TestFixEtcd(t *testing.T) {
t.Fatal(err)
}

k.EXPECT().KubeCreateOrUpdate(ctx, newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)).MaxTimes(1).Return(nil)
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobFixPeers, k, "app")
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameFixPeers, true, &propPolicy).MaxTimes(1).Return(nil)

// cleanup
Expand Down Expand Up @@ -229,7 +241,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(errors.New(tt.wantErr))
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(errors.New(tt.wantErr))
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
},
},
{
Expand All @@ -245,7 +259,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand All @@ -270,7 +286,9 @@ func TestFixEtcd(t *testing.T) {
t.Fatal(err)
}

k.EXPECT().KubeCreateOrUpdate(ctx, newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)).MaxTimes(1).Return(errors.New("oh no, can't create job fix peers"))
jobFixPeers := newJobFixPeers(doc.OpenShiftCluster.Name, peerPods, de.Node)
k.EXPECT().KubeCreateOrUpdate(ctx, jobFixPeers).MaxTimes(1).Return(errors.New("oh no, can't create job fix peers"))
createPodCompletedEvent(ctx, jobFixPeers, k, "app")

// cleanup
k.EXPECT().KubeDelete(ctx, serviceAcc.GetKind(), serviceAcc.GetNamespace(), serviceAcc.GetName(), true, nil).MaxTimes(1).Return(nil)
Expand All @@ -292,7 +310,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")

// fixPeers
serviceAcc := newServiceAccount(serviceAccountName, doc.OpenShiftCluster.Name)
Expand Down Expand Up @@ -322,7 +342,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand Down Expand Up @@ -353,7 +375,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand Down Expand Up @@ -387,7 +411,9 @@ func TestFixEtcd(t *testing.T) {
k.EXPECT().KubeList(ctx, "Pod", namespaceEtcds).MaxTimes(1).Return(buf.Bytes(), nil)

// backupEtcd
k.EXPECT().KubeCreateOrUpdate(ctx, createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))).MaxTimes(1).Return(nil)
jobBackupEtcd := createBackupEtcdDataJob(doc.OpenShiftCluster.Name, buildNodeName(doc, degradedNode))
k.EXPECT().KubeCreateOrUpdate(ctx, jobBackupEtcd).MaxTimes(1).Return(nil)
createPodCompletedEvent(ctx, jobBackupEtcd, k, "app")
propPolicy := metav1.DeletePropagationBackground
k.EXPECT().KubeDelete(ctx, "Job", namespaceEtcds, jobNameDataBackup, true, &propPolicy).MaxTimes(1).Return(nil)

Expand Down Expand Up @@ -463,6 +489,33 @@ func TestFixEtcd(t *testing.T) {
}
}

func createPodCompletedEvent(ctx context.Context, o *unstructured.Unstructured, k *mock_adminactions.MockKubeActions, labelKey string) {
k.EXPECT().KubeWatch(ctx, o, labelKey).AnyTimes().DoAndReturn(func(ctx context.Context, o *unstructured.Unstructured, labelKey string) (watch.Interface, error) {
w := watch.NewFake()
go func() {
time.Sleep(time.Microsecond)
w.Action(watch.Added, newCompletedPodWatchEvent(o))
}()
return w, nil
})
}

func newCompletedPodWatchEvent(o *unstructured.Unstructured) kruntime.Object {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: o.GetName(),
Namespace: o.GetNamespace(),
},
Status: corev1.PodStatus{
Phase: corev1.PodSucceeded,
},
}
}

func buildClusterName(doc *api.OpenShiftClusterDocument) string {
return doc.OpenShiftCluster.Name + "-" + doc.OpenShiftCluster.Properties.InfraID
}
Expand All @@ -472,7 +525,6 @@ func buildNodeName(doc *api.OpenShiftClusterDocument, node string) string {
return c + "-" + node
}

// TODO fix degraded pods to not create multiple conflicts every time
func newDegradedPods(doc *api.OpenShiftClusterDocument, multiDegraded, emptyEnv bool) *corev1.PodList {
var (
degradedNodeMaster2 = buildNodeName(doc, degradedNode)
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/mocks/adminactions/adminactions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4d54acd

Please sign in to comment.