Skip to content

Commit

Permalink
rm SA and retry OnError instead Condition
Browse files Browse the repository at this point in the history
  • Loading branch information
SrinivasAtmakuri committed Sep 18, 2023
1 parent fe72068 commit 4edfd77
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 261 deletions.
219 changes: 85 additions & 134 deletions pkg/frontend/admin_openshiftcluster_etcdcertificaterenew.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
"strings"
"time"

mgmtstorage "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2019-06-01/storage"
azstorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/go-chi/chi/v5"
configv1 "github.com/openshift/api/config/v1"
operatorv1 "github.com/openshift/api/operator/v1"
"github.com/sirupsen/logrus"
"github.com/ugorji/go/codec"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/strings/slices"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
Expand All @@ -33,23 +33,16 @@ import (
utilcert "github.com/Azure/ARO-RP/pkg/util/cert"
utilpem "github.com/Azure/ARO-RP/pkg/util/pem"
"github.com/Azure/ARO-RP/pkg/util/steps"
"github.com/Azure/ARO-RP/pkg/util/stringutils"
"github.com/Azure/ARO-RP/pkg/util/version"
)

type etcdrenew struct {
log *logrus.Entry
k adminactions.KubeActions
a adminactions.AzureActions
doc *api.OpenShiftClusterDocument
blobService *azstorage.BlobStorageClient
secretNames []string
mode string
backupSecrets map[string][]byte
lastRevision int32
etcdSecretsRecovered []string
copyComplete []string
deleteComplete []string
log *logrus.Entry
k adminactions.KubeActions
secretNames []string
mode string
backupSecrets map[string][]byte
lastRevision int32
}

var etcdOperatorControllerConditionsExpected = map[string]operatorv1.ConditionStatus{
Expand Down Expand Up @@ -88,18 +81,10 @@ func (e *etcdrenew) validateEtcdAndBackupDeleteSecretOnFlagSet(ctx context.Conte
s = append(s,
steps.Action(e.fetchEtcdCurrentRevision),
steps.Action(e.backupEtcdSecrets),
steps.Action(e.initializeblobService),
steps.Condition(e.copySecretsToBlobStorage, 5*time.Minute, true),
steps.Action(e.deleteEtcdSecrets),
)
}

if e.mode == "renewed" {
s = append(s,
steps.Condition(e.deleteSecretsFromBlobStorage, 5*time.Minute, true),
)
}

_, err := steps.Run(ctx, e.log, 10*time.Second, s, nil)
if err != nil {
return err
Expand All @@ -123,7 +108,7 @@ func (e *etcdrenew) isEtcDRootCertRenewed(ctx context.Context) error {
func (e *etcdrenew) revertChanges(ctx context.Context) error {
s := []steps.Step{
steps.Action(e.fetchEtcdCurrentRevision),
steps.Condition(e.recoverEtcdSecrets, 10*time.Minute, true),
steps.Action(e.recoverEtcdSecrets),
steps.Condition(e.isEtcdRevised, 30*time.Minute, true),
}
_, err := steps.Run(ctx, e.log, 10*time.Second, s, nil)
Expand Down Expand Up @@ -153,28 +138,13 @@ func (f *frontend) _postAdminOpenShiftClusterEtcdCertificateRenew(ctx context.Co
return err
}

subscriptionDoc, err := f.getSubscriptionDocument(ctx, doc.Key)
if err != nil {
return err
}
a, err := f.azureActionsFactory(log, f.env, doc.OpenShiftCluster, subscriptionDoc)
if err != nil {
return err
}

e := &etcdrenew{
log: log,
k: k,
a: a,
secretNames: nil,
mode: "",
backupSecrets: make(map[string][]byte),
lastRevision: 0,
etcdSecretsRecovered: make([]string, 9),
doc: doc,
blobService: &azstorage.BlobStorageClient{},
copyComplete: make([]string, 9),
deleteComplete: make([]string, 9),
log: log,
k: k,
secretNames: nil,
mode: "",
backupSecrets: make(map[string][]byte),
lastRevision: 0,
}

if err = e.validateClusterVersion(ctx); err != nil {
Expand All @@ -200,7 +170,10 @@ func (f *frontend) _postAdminOpenShiftClusterEtcdCertificateRenew(ctx context.Co
err = e.isEtcDRootCertRenewed(ctx)
if err != nil {
e.log.Infoln("Attempting to recover from backup, and wait for new revision to be applied after recovery")
return e.revertChanges(ctx)
if err = e.revertChanges(ctx); err != nil {
return err
}
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", "etcd renewal failed, recovery performed to revert the changes.")
}

e.log.Infoln("Etcd certificates are renewed and new revision is applied, verifying.")
Expand Down Expand Up @@ -337,87 +310,61 @@ func (e *etcdrenew) fetchEtcdCurrentRevision(ctx context.Context) error {
func (e *etcdrenew) backupEtcdSecrets(ctx context.Context) error {
e.log.Infoln("backing up etcd secrets now")
for _, secretname := range e.secretNames {
e.log.Infof("Backing up secret %s", secretname)
data, err := e.k.KubeGet(ctx, "Secret", namespaceEtcds, secretname)
if err != nil {
return err
}
secret := &corev1.Secret{}
err = codec.NewDecoderBytes(data, &codec.JsonHandle{}).Decode(secret)
if err != nil {
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to decode secret, %s", err.Error()))
}
secret.CreationTimestamp = metav1.Time{
Time: time.Now(),
}
secret.ObjectMeta.ResourceVersion = ""
secret.ObjectMeta.UID = ""

var cert []byte
err = codec.NewEncoderBytes(&cert, &codec.JsonHandle{}).Encode(secret)
if err != nil {
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to encode secret, %s", err.Error()))
}
e.backupSecrets[secretname] = cert
return retry.OnError(wait.Backoff{
Steps: 10,
Duration: 2 * time.Second,
}, func(err error) bool {
return errors.IsBadRequest(err) || errors.IsInternalError(err) || errors.IsServerTimeout(err)
}, func() error {
e.log.Infof("Backing up secret %s", secretname)
data, err := e.k.KubeGet(ctx, "Secret", namespaceEtcds, secretname)
if err != nil {
return err
}
secret := &corev1.Secret{}
err = codec.NewDecoderBytes(data, &codec.JsonHandle{}).Decode(secret)
if err != nil {
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to decode secret, %s", err.Error()))
}
secret.CreationTimestamp = metav1.Time{
Time: time.Now(),
}
secret.ObjectMeta.ResourceVersion = ""
secret.ObjectMeta.UID = ""

var cert []byte
err = codec.NewEncoderBytes(&cert, &codec.JsonHandle{}).Encode(secret)
if err != nil {
return api.NewCloudError(http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", fmt.Sprintf("failed to encode secret, %s", err.Error()))
}
e.backupSecrets[secretname] = cert
return nil
})
}

e.log.Infoln("backing up etcd secrets done")
return nil
}

func (e *etcdrenew) initializeblobService(ctx context.Context) error {
var err error
clusterRGName := stringutils.LastTokenByte(e.doc.OpenShiftCluster.Properties.ClusterProfile.ResourceGroupID, '/')
account := "cluster" + e.doc.OpenShiftCluster.Properties.StorageSuffix
e.blobService, err = e.a.BlobService(ctx, clusterRGName, account, mgmtstorage.Permissions("crwd"), mgmtstorage.SignedResourceTypesO)
return err
}

// saves etcd secrets into cluster StorageAccount, ARO container by secret name
func (e *etcdrenew) copySecretsToBlobStorage(ctx context.Context) (bool, error) {
e.log.Infoln("Copying secrets into blob storage")
for _, secret := range e.secretNames {
if slices.Contains(e.copyComplete, secret) {
continue
}
e.log.Infof("Copying secret %s", secret)
err := e.a.CopyAzureBlobStorage(ctx, "aro", secret, e.backupSecrets[secret], e.blobService)
if err != nil {
return false, nil
}
e.copyComplete = append(e.copyComplete, secret)
}
return true, nil
}

// On successful etcd renewal, delets the secrets from ARO container in cluster StorageAccount
func (e *etcdrenew) deleteSecretsFromBlobStorage(ctx context.Context) (bool, error) {
e.log.Infoln("deleting secrets from blob storage")
for _, secret := range e.secretNames {
if slices.Contains(e.deleteComplete, secret) {
continue
}
e.log.Infof("deleting secret %s", secret)
err := e.a.DeleteAzureBlobStorage(ctx, "aro", secret, e.backupSecrets[secret], e.blobService)
if err != nil {
return false, nil
}
e.deleteComplete = append(e.deleteComplete, secret)
}
return true, nil
}

// delete the etcd secrets and on successful deletion,
// valid secrets will be recreated and a new revision will be applied by the etcd operator
func (e *etcdrenew) deleteEtcdSecrets(ctx context.Context) error {
e.log.Infoln("deleting etcd secrets now")
for _, secretname := range e.secretNames {
e.log.Infof("Deleting secret %s", secretname)
err := e.k.KubeDelete(ctx, "Secret", namespaceEtcds, secretname, false, nil)
if err != nil {
return err
}
e.log.Infof("Secret deleted %s", secretname)
return retry.OnError(wait.Backoff{
Steps: 10,
Duration: 2 * time.Second,
}, func(err error) bool {
return errors.IsBadRequest(err) || errors.IsInternalError(err) || errors.IsServerTimeout(err)
}, func() error {
e.log.Infof("Deleting secret %s", secretname)
err := e.k.KubeDelete(ctx, "Secret", namespaceEtcds, secretname, false, nil)
if err != nil {
return err
}
e.log.Infof("Secret deleted %s", secretname)
return nil
})
}

return nil
Expand Down Expand Up @@ -453,26 +400,30 @@ func (e *etcdrenew) isEtcdRevised(ctx context.Context) (bool, error) {
}

// Applies the backedup etcd secret and applies them on the cluster
func (e *etcdrenew) recoverEtcdSecrets(ctx context.Context) (bool, error) {
func (e *etcdrenew) recoverEtcdSecrets(ctx context.Context) error {
e.log.Infoln("recovering etcd secrets now")
for secretname, data := range e.backupSecrets {
// skip secrets which are already recovered
if slices.Contains(e.etcdSecretsRecovered, secretname) {
continue
}
e.log.Infof("Recovering secret %s", secretname)
obj := &unstructured.Unstructured{}
err := obj.UnmarshalJSON(data)
if err != nil {
return false, api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidRequestContent, "", "The request content was invalid and could not be deserialized: %q.", err)
}
err = e.k.KubeCreateOrUpdate(ctx, obj)
if err != nil {
return false, nil
}
e.etcdSecretsRecovered = append(e.etcdSecretsRecovered, secretname)
return retry.OnError(wait.Backoff{
Steps: 10,
Duration: 2 * time.Second,
}, func(err error) bool {
return errors.IsBadRequest(err) || errors.IsInternalError(err) || errors.IsServerTimeout(err)
}, func() error {
// skip secrets which are already recovered
e.log.Infof("Recovering secret %s", secretname)
obj := &unstructured.Unstructured{}
err := obj.UnmarshalJSON(data)
if err != nil {
return api.NewCloudError(http.StatusBadRequest, api.CloudErrorCodeInvalidRequestContent, "", "The request content was invalid and could not be deserialized: %q.", err)
}
err = e.k.KubeCreateOrUpdate(ctx, obj)
if err != nil {
return err
}
return nil
})
}
e.log.Infoln("recovered etcd secrets")

return true, nil
return nil
}
Loading

0 comments on commit 4edfd77

Please sign in to comment.