diff --git a/api/core/v1alpha1/common_helpers.go b/api/core/v1alpha1/common_helpers.go index c5b09065..76171615 100644 --- a/api/core/v1alpha1/common_helpers.go +++ b/api/core/v1alpha1/common_helpers.go @@ -34,6 +34,7 @@ const ( ) var ( + LegacyOpVersion = semver.MustParse("1.2.0") LatestOpVersion = semver.MustParse("1.3.0") ) @@ -300,11 +301,11 @@ func (p *PodSet) GetSemVer() (*semver.Version, bool) { func (p *PodSet) GetOperatorVersion() semver.Version { if p.OperatorVersion == nil { - return LatestOpVersion + return LegacyOpVersion } v, err := semver.ParseTolerant(*p.OperatorVersion) if err != nil { - return LatestOpVersion + return LegacyOpVersion } return v } diff --git a/pkg/controllers/cnset/controller.go b/pkg/controllers/cnset/controller.go index 3f51dd0b..fd84669d 100644 --- a/pkg/controllers/cnset/controller.go +++ b/pkg/controllers/cnset/controller.go @@ -324,11 +324,14 @@ func syncCloneSet(ctx *recon.Context[*v1alpha1.CNSet], cs *kruisev1alpha1.CloneS // ref: https://openkruise.io/zh/docs/next/user-manuals/cloneset/#%E6%94%AF%E6%8C%81-pvc-%E6%A8%A1%E6%9D%BF syncPersistentVolumeClaim(cn, cs) - cm, err := buildCNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet) + cm, configSuffix, err := buildCNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet) if err != nil { return err } - return common.SyncConfigMap(ctx, &cs.Spec.Template.Spec, cm) + if cn.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + cs.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix + } + return common.SyncConfigMap(ctx, &cs.Spec.Template.Spec, cm, cn.Spec.GetOperatorVersion()) } func setReady(cn *v1alpha1.CNSet) { diff --git a/pkg/controllers/cnset/resource.go b/pkg/controllers/cnset/resource.go index 3196400a..4a7aa6cb 100644 --- a/pkg/controllers/cnset/resource.go +++ b/pkg/controllers/cnset/resource.go @@ -54,7 +54,11 @@ sql-address = "${POD_IP}:{{ .CNSQLPort }}" service-host = "${POD_IP}" EOF # build instance config -sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +if [ -n "${CONFIG_SUFFIX}" ]; then + sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }}-${CONFIG_SUFFIX} > ${conf} +else + sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +fi # append lock-service configs lsc=$(mktemp) @@ -196,6 +200,7 @@ func syncPodSpec(cn *v1alpha1.CNSet, cs *kruisev1alpha1.CloneSet, sp v1alpha1.Sh mainRef.Env = []corev1.EnvVar{ util.FieldRefEnv(common.PodNameEnvKey, "metadata.name"), util.FieldRefEnv(common.NamespaceEnvKey, "metadata.namespace"), + util.FieldRefEnv(common.ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", common.ConfigSuffixAnno)), {Name: common.HeadlessSvcEnvKey, Value: headlessSvcName(cn)}, util.FieldRefEnv(common.PodIPEnvKey, "status.podIP"), } @@ -249,9 +254,9 @@ func syncPodSpec(cn *v1alpha1.CNSet, cs *kruisev1alpha1.CloneSet, sp v1alpha1.Sh } } -func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { +func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, string, error) { if ls.Status.Discovery == nil { - return nil, errors.New("logset had not yet exposed HAKeeper discovery address") + return nil, "", errors.New("logset had not yet exposed HAKeeper discovery address") } cfg := cn.Spec.Config if cfg == nil { @@ -285,7 +290,7 @@ func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.Confi } s, err := cfg.ToString() if err != nil { - return nil, err + return nil, "", err } buff := new(bytes.Buffer) err = startScriptTpl.Execute(buff, &model{ @@ -295,9 +300,10 @@ func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.Confi LockServicePort: common.LockServicePort, }) if err != nil { - return nil, err + return nil, "", err } + configSuffix := common.DataDigest([]byte(s)) return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: configMapName(cn), @@ -305,8 +311,8 @@ func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.Confi Labels: common.SubResourceLabels(cn), }, Data: map[string]string{ - common.ConfigFile: s, + fmt.Sprintf("%s-%s", common.ConfigFile, configSuffix): s, common.Entrypoint: buff.String(), }, - }, nil + }, configSuffix, nil } diff --git a/pkg/controllers/cnset/resource_test.go b/pkg/controllers/cnset/resource_test.go index f2fadeba..2199e16a 100644 --- a/pkg/controllers/cnset/resource_test.go +++ b/pkg/controllers/cnset/resource_test.go @@ -227,13 +227,14 @@ service-addresses = [] for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewGomegaWithT(t) - got, err := buildCNSetConfigMap(tt.args.cn, tt.args.ls) + got, configSuffix, err := buildCNSetConfigMap(tt.args.cn, tt.args.ls) if (err != nil) != tt.wantErr { t.Errorf("buildDNSetConfigMap() error = %v, wantErr %v", err, tt.wantErr) return } - g.Expect(got.Data["config.toml"]).NotTo(BeNil()) - g.Expect(cmp.Diff(tt.wantConfig, got.Data["config.toml"])).To(BeEmpty()) + configKey := "config.toml-" + configSuffix + g.Expect(got.Data[configKey]).NotTo(BeNil()) + g.Expect(cmp.Diff(tt.wantConfig, got.Data[configKey])).To(BeEmpty()) }) } } diff --git a/pkg/controllers/common/configmap.go b/pkg/controllers/common/configmap.go index db1bf53f..e1b2a4c6 100644 --- a/pkg/controllers/common/configmap.go +++ b/pkg/controllers/common/configmap.go @@ -17,12 +17,16 @@ package common import ( "encoding/json" "fmt" + "github.com/blang/semver/v4" "github.com/cespare/xxhash" "github.com/go-errors/errors" recon "github.com/matrixorigin/controller-runtime/pkg/reconciler" "github.com/matrixorigin/controller-runtime/pkg/util" + "github.com/matrixorigin/matrixone-operator/api/core/v1alpha1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + "strings" ) const ( @@ -38,14 +42,19 @@ const ( // SyncConfigMap syncs the desired configmap for pods, which will cause rolling-update if the // data of the configmap is changed -func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1.ConfigMap) error { +func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1.ConfigMap, operatorVersion semver.Version) error { var currentCmName string + var desiredName string + var err error vp := util.FindFirst(podSpec.Volumes, util.WithVolumeName("config")) if vp != nil { currentCmName = vp.Name } - // TODO(aylei): GC stale configmaps (maybe in another worker?) - desiredName, err := ensureConfigMap(kubeCli, currentCmName, cm) + if operatorVersion.Equals(v1alpha1.LatestOpVersion) { + desiredName, err = ensureConfigMap(kubeCli, cm) + } else { + desiredName, err = ensureConfigMapLegacy(kubeCli, currentCmName, cm) + } if err != nil { return err } @@ -66,11 +75,41 @@ func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1 } // ensureConfigMap ensures the configmap exist in k8s -func ensureConfigMap(kubeCli recon.KubeClient, currentCm string, desired *corev1.ConfigMap) (string, error) { +func ensureConfigMap(kubeCli recon.KubeClient, desired *corev1.ConfigMap) (string, error) { c := desired.DeepCopy() - if err := addConfigMapDigest(c); err != nil { + old := &corev1.ConfigMap{} + exist, err := kubeCli.Exist(client.ObjectKeyFromObject(c), old) + if err != nil { return "", err } + if exist { + podList := &corev1.PodList{} + err = kubeCli.List(podList, client.InNamespace(c.Namespace)) + if err != nil { + return "", err + } + for key, v := range old.Data { + if withDigest(key, v) && configInUse(key, podList.Items) { + // append item that is still in use + c.Data[key] = v + } + } + err = kubeCli.Update(c) + } else { + err = kubeCli.CreateOwned(c) + } + if err != nil { + return "", err + } + return c.Name, nil +} + +// Deprecated: use ensureConfigMap instead +func ensureConfigMapLegacy(kubeCli recon.KubeClient, currentCm string, desired *corev1.ConfigMap) (string, error) { + c := desired.DeepCopy() + if err := addConfigMapDigest(c); err != nil { + return "", errors.Wrap(err, 0) + } // config digest not changed if c.Name == currentCm { return currentCm, nil @@ -78,11 +117,30 @@ func ensureConfigMap(kubeCli recon.KubeClient, currentCm string, desired *corev1 // otherwise ensure the configmap exists err := util.Ignore(apierrors.IsAlreadyExists, kubeCli.CreateOwned(c)) if err != nil { - return "", err + return "", errors.Wrap(err, 0) } return c.Name, nil } +func withDigest(key string, v string) bool { + return strings.Contains(key, DataDigest([]byte(v))) +} + +func configInUse(key string, podList []corev1.Pod) bool { + for _, pod := range podList { + s := pod.Annotations[ConfigSuffixAnno] + if len(s) > 0 && strings.Contains(key, s) { + return true + } + } + return false +} + +func DataDigest(data []byte) string { + sum := xxhash.Sum64(data) + return fmt.Sprintf("%x", sum)[0:7] +} + func addConfigMapDigest(cm *corev1.ConfigMap) error { s, err := json.Marshal(cm.Data) if err != nil { diff --git a/pkg/controllers/common/configmap_test.go b/pkg/controllers/common/configmap_test.go index cd6c9db0..3cfe4e71 100644 --- a/pkg/controllers/common/configmap_test.go +++ b/pkg/controllers/common/configmap_test.go @@ -22,7 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestAddConfigMapDigest(t *testing.T) { +func TestDataDigest(t *testing.T) { // need fuzz? cmList := []*corev1.ConfigMap{ newCM(""), @@ -33,8 +33,8 @@ func TestAddConfigMapDigest(t *testing.T) { } g := NewGomegaWithT(t) for _, cm := range cmList { - g.Expect(addConfigMapDigest(cm)).To(Succeed()) - g.Expect(utf8string.NewString(cm.Name).IsASCII()).To(BeTrue()) + digest := DataDigest([]byte(cm.Data["config"])) + g.Expect(utf8string.NewString(digest).IsASCII()).To(BeTrue()) } } diff --git a/pkg/controllers/common/consts.go b/pkg/controllers/common/consts.go index af3bd141..de356915 100644 --- a/pkg/controllers/common/consts.go +++ b/pkg/controllers/common/consts.go @@ -31,4 +31,6 @@ const ( LabelManagedBy = "matrixorigin.io/managed-by" LabelOwnerUID = "matrixorigin.io/owner-uid" + + ConfigSuffixAnno = "matrixorigin.io/config-suffix" ) diff --git a/pkg/controllers/common/podset.go b/pkg/controllers/common/podset.go index e245c555..b9504ee6 100644 --- a/pkg/controllers/common/podset.go +++ b/pkg/controllers/common/podset.go @@ -35,7 +35,7 @@ type SyncMOPodTask struct { ConfigMap *corev1.ConfigMap KubeCli recon.KubeClient StorageProvider *v1alpha1.SharedStorageProvider - + ConfigSuffix string // optional MutateContainer func(c *corev1.Container) MutatePod func(p *corev1.PodTemplateSpec) @@ -74,7 +74,7 @@ func GetSemanticVersion(meta *metav1.ObjectMeta) semver.Version { // SyncMOPod execute the given SyncMOPodTask which keeps the pod spec update to date func SyncMOPod(t *SyncMOPodTask) error { syncPodTemplate(t) - if err := SyncConfigMap(t.KubeCli, &t.TargetTemplate.Spec, t.ConfigMap); err != nil { + if err := SyncConfigMap(t.KubeCli, &t.TargetTemplate.Spec, t.ConfigMap, t.PodSet.GetOperatorVersion()); err != nil { return errors.WrapPrefix(err, "sync configmap", 0) } return nil @@ -106,6 +106,9 @@ func syncPodTemplate(t *SyncMOPodTask) { if t.MutatePod != nil { t.MutatePod(t.TargetTemplate) } + if t.PodSet.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + t.TargetTemplate.ObjectMeta.Annotations[ConfigSuffixAnno] = t.ConfigSuffix + } p.Overlay.OverlayPodMeta(&t.TargetTemplate.ObjectMeta) p.Overlay.OverlayPodSpec(specRef) @@ -119,6 +122,7 @@ func syncMainContainer(p *v1alpha1.PodSet, c *corev1.Container, mutateFn func(c c.Env = []corev1.EnvVar{ util.FieldRefEnv(PodNameEnvKey, "metadata.name"), util.FieldRefEnv(NamespaceEnvKey, "metadata.namespace"), + util.FieldRefEnv(ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", ConfigSuffixAnno)), } memLimitEnv := GoMemLimitEnv(p.MemoryLimitPercent, c.Resources.Limits.Memory(), p.Overlay) if memLimitEnv != nil { diff --git a/pkg/controllers/common/template.go b/pkg/controllers/common/template.go index dd299809..b1b35c5a 100644 --- a/pkg/controllers/common/template.go +++ b/pkg/controllers/common/template.go @@ -59,6 +59,8 @@ const ( NamespaceEnvKey = "NAMESPACE" // PodIPEnvKey is the container environment variable to reflect the IP of the Pod that runs the container PodIPEnvKey = "POD_IP" + // ConfigSuffixEnvKey is the container environment variable to reflect the config suffix + ConfigSuffixEnvKey = "CONFIG_SUFFIX" ) // SubResourceLabels generate labels for sub-resources diff --git a/pkg/controllers/dnset/controller.go b/pkg/controllers/dnset/controller.go index 4745452f..fea66f99 100644 --- a/pkg/controllers/dnset/controller.go +++ b/pkg/controllers/dnset/controller.go @@ -186,12 +186,14 @@ func (d *Actor) Create(ctx *recon.Context[*v1alpha1.DNSet]) error { syncPodSpec(dn, dnSet, ctx.Dep.Deps.LogSet.Spec.SharedStorage) syncPersistentVolumeClaim(dn, dnSet) - configMap, err := buildDNSetConfigMap(dn, ctx.Dep.Deps.LogSet) + configMap, configSuffix, err := buildDNSetConfigMap(dn, ctx.Dep.Deps.LogSet) if err != nil { return err } - - if err := common.SyncConfigMap(ctx, &dnSet.Spec.Template.Spec, configMap); err != nil { + if dn.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + dnSet.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix + } + if err := common.SyncConfigMap(ctx, &dnSet.Spec.Template.Spec, configMap, dn.Spec.GetOperatorVersion()); err != nil { return err } diff --git a/pkg/controllers/dnset/resource.go b/pkg/controllers/dnset/resource.go index 7abfd875..9fe92f44 100644 --- a/pkg/controllers/dnset/resource.go +++ b/pkg/controllers/dnset/resource.go @@ -62,7 +62,11 @@ service-address = "${ADDR}:{{ .DNServicePort }}" service-host = "${ADDR}" EOF # build instance config -sed "/\[{{ .ConfigAlias }}\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +if [ -n "${CONFIG_SUFFIX}" ]; then + sed "/\[{{ .ConfigAlias }}\]/r ${bc}" {{ .ConfigFilePath }}-${CONFIG_SUFFIX} > ${conf} +else + sed "/\[{{ .ConfigAlias }}\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +fi # append lock-service configs lsc=$(mktemp) @@ -152,6 +156,7 @@ func syncPodSpec(dn *v1alpha1.DNSet, sts *kruise.StatefulSet, sp v1alpha1.Shared mainRef.Env = []corev1.EnvVar{ util.FieldRefEnv(common.PodNameEnvKey, "metadata.name"), util.FieldRefEnv(common.NamespaceEnvKey, "metadata.namespace"), + util.FieldRefEnv(common.ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", common.ConfigSuffixAnno)), {Name: common.HeadlessSvcEnvKey, Value: headlessSvcName(dn)}, } memLimitEnv := common.GoMemLimitEnv(dn.Spec.MemoryLimitPercent, dn.Spec.Resources.Limits.Memory(), dn.Spec.Overlay) @@ -177,9 +182,9 @@ func syncPodSpec(dn *v1alpha1.DNSet, sts *kruise.StatefulSet, sp v1alpha1.Shared } // buildDNSetConfigMap return dn set configmap -func buildDNSetConfigMap(dn *v1alpha1.DNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { +func buildDNSetConfigMap(dn *v1alpha1.DNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, string, error) { if ls.Status.Discovery == nil { - return nil, errors.New("HAKeeper discovery address not ready") + return nil, "", errors.New("HAKeeper discovery address not ready") } conf := dn.Spec.Config if conf == nil { @@ -208,7 +213,7 @@ func buildDNSetConfigMap(dn *v1alpha1.DNSet, ls *v1alpha1.LogSet) (*corev1.Confi } s, err := conf.ToString() if err != nil { - return nil, err + return nil, "", err } buff := new(bytes.Buffer) @@ -220,16 +225,17 @@ func buildDNSetConfigMap(dn *v1alpha1.DNSet, ls *v1alpha1.LogSet) (*corev1.Confi ConfigAlias: configAlias, }) if err != nil { - return nil, err + return nil, "", err } + configSuffix := common.DataDigest([]byte(s)) return &corev1.ConfigMap{ ObjectMeta: common.ObjMetaTemplate(dn, configMapName(dn)), Data: map[string]string{ - common.ConfigFile: s, + fmt.Sprintf("%s-%s", common.ConfigFile, configSuffix): s, common.Entrypoint: buff.String(), }, - }, nil + }, configSuffix, nil } func buildHeadlessSvc(dn *v1alpha1.DNSet) *corev1.Service { @@ -250,16 +256,18 @@ func syncPersistentVolumeClaim(dn *v1alpha1.DNSet, sts *kruise.StatefulSet) { } func syncPods(ctx *recon.Context[*v1alpha1.DNSet], sts *kruise.StatefulSet) error { - cm, err := buildDNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet) + cm, configSuffix, err := buildDNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet) if err != nil { return err } - syncPodMeta(ctx.Obj, sts) + if ctx.Obj.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + sts.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix + } if ctx.Dep != nil { syncPodSpec(ctx.Obj, sts, ctx.Dep.Deps.LogSet.Spec.SharedStorage) } - return common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm) + return common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm, ctx.Obj.Spec.GetOperatorVersion()) } diff --git a/pkg/controllers/dnset/resource_test.go b/pkg/controllers/dnset/resource_test.go index e4a45358..52ff6de3 100644 --- a/pkg/controllers/dnset/resource_test.go +++ b/pkg/controllers/dnset/resource_test.go @@ -245,13 +245,14 @@ discovery-address = "test:6001" for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewGomegaWithT(t) - got, err := buildDNSetConfigMap(tt.args.dn, tt.args.ls) + got, configSuffix, err := buildDNSetConfigMap(tt.args.dn, tt.args.ls) if (err != nil) != tt.wantErr { t.Errorf("buildDNSetConfigMap() error = %v, wantErr %v", err, tt.wantErr) return } - g.Expect(got.Data["config.toml"]).NotTo(BeNil()) - g.Expect(cmp.Diff(tt.wantConfig, got.Data["config.toml"])).To(BeEmpty()) + configKey := "config.toml-" + configSuffix + g.Expect(got.Data[configKey]).NotTo(BeNil()) + g.Expect(cmp.Diff(tt.wantConfig, got.Data[configKey])).To(BeEmpty()) }) } } diff --git a/pkg/controllers/logset/configmap.go b/pkg/controllers/logset/configmap.go index b881982f..7d34cb7a 100644 --- a/pkg/controllers/logset/configmap.go +++ b/pkg/controllers/logset/configmap.go @@ -68,7 +68,11 @@ gossip-address-v2 = "${ADDR}:{{ .GossipPort }}" EOF # build instance config -sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +if [ -n "${CONFIG_SUFFIX}" ]; then + sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }}-${CONFIG_SUFFIX} > ${conf} +else + sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +fi # insert gossip config gossipTmp=$(mktemp) @@ -124,7 +128,11 @@ gossip-port = {{ .GossipPort }} EOF # build instance config -sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +if [ -n "${CONFIG_SUFFIX}" ]; then + sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }}-${CONFIG_SUFFIX} > ${conf} +else + sed "/\[logservice\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} +fi # insert gossip config gossipTmp=$(mktemp) @@ -185,7 +193,7 @@ func buildGossipSeedsConfigMap(ls *v1alpha1.LogSet, sts *kruisev1.StatefulSet) ( } // buildConfigMap build the configmap for log service -func buildConfigMap(ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { +func buildConfigMap(ls *v1alpha1.LogSet) (*corev1.ConfigMap, string, error) { conf := ls.Spec.Config if conf == nil { conf = v1alpha1.NewTomlConfig(map[string]interface{}{}) @@ -210,7 +218,7 @@ func buildConfigMap(ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { } s, err := conf.ToString() if err != nil { - return nil, err + return nil, "", err } // 2. build the start script @@ -228,11 +236,11 @@ func buildConfigMap(ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { BootstrapFilePath: fmt.Sprintf("%s/%s", bootstrapPath, bootstrapFile), GossipFilePath: fmt.Sprintf("%s/%s", gossipPath, gossipFile), }) - if err != nil { - return nil, err + return nil, "", err } + configSuffix := common.DataDigest([]byte(s)) return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: ls.Namespace, @@ -240,10 +248,10 @@ func buildConfigMap(ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { Labels: common.SubResourceLabels(ls), }, Data: map[string]string{ - configFile: s, + fmt.Sprintf("%s-%s", configFile, configSuffix): s, entrypoint: buff.String(), }, - }, nil + }, configSuffix, nil } func HaKeeperAdds(ls *v1alpha1.LogSet) []string { diff --git a/pkg/controllers/logset/controller.go b/pkg/controllers/logset/controller.go index 24748846..17b41d7a 100644 --- a/pkg/controllers/logset/controller.go +++ b/pkg/controllers/logset/controller.go @@ -164,11 +164,14 @@ func (r *Actor) Create(ctx *recon.Context[*v1alpha1.LogSet]) error { return err } // sync the config - cm, err := buildConfigMap(ls) + cm, configSuffix, err := buildConfigMap(ls) if err != nil { return err } - if err := common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm); err != nil { + if ls.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + sts.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix + } + if err := common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm, ls.Spec.GetOperatorVersion()); err != nil { return err } @@ -367,13 +370,16 @@ func updateGossipConfig(ctx *recon.Context[*v1alpha1.LogSet], sts *kruisev1.Stat } func syncPods(ctx *recon.Context[*v1alpha1.LogSet], sts *kruisev1.StatefulSet) error { - cm, err := buildConfigMap(ctx.Obj) + cm, configSuffix, err := buildConfigMap(ctx.Obj) if err != nil { return err } syncPodMeta(ctx.Obj, sts) + if ctx.Obj.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) { + sts.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix + } syncPodSpec(ctx.Obj, &sts.Spec.Template.Spec) - return common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm) + return common.SyncConfigMap(ctx, &sts.Spec.Template.Spec, cm, ctx.Obj.Spec.GetOperatorVersion()) } func (r *Actor) Reconcile(mgr manager.Manager) error { diff --git a/pkg/controllers/logset/sts.go b/pkg/controllers/logset/sts.go index dd25998e..7c7548d5 100644 --- a/pkg/controllers/logset/sts.go +++ b/pkg/controllers/logset/sts.go @@ -78,6 +78,7 @@ func syncPodSpec(ls *v1alpha1.LogSet, specRef *corev1.PodSpec) { mainRef.Env = []corev1.EnvVar{ util.FieldRefEnv(PodNameEnvKey, "metadata.name"), util.FieldRefEnv(NamespaceEnvKey, "metadata.namespace"), + util.FieldRefEnv(common.ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", common.ConfigSuffixAnno)), util.FieldRefEnv(PodIPEnvKey, "status.podIP"), {Name: HeadlessSvcEnvKey, Value: headlessSvcName(ls)}, } diff --git a/pkg/controllers/logset/sts_test.go b/pkg/controllers/logset/sts_test.go index f4bf3f6f..86a83b94 100644 --- a/pkg/controllers/logset/sts_test.go +++ b/pkg/controllers/logset/sts_test.go @@ -116,6 +116,14 @@ func Test_syncPodSpec(t *testing.T) { FieldPath: "metadata.namespace", }, }, + }, { + Name: "CONFIG_SUFFIX", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.annotations['matrixorigin.io/config-suffix']", + }, + }, }, { Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{ diff --git a/pkg/controllers/proxyset/resource.go b/pkg/controllers/proxyset/resource.go index 929fa73b..a1e0b415 100644 --- a/pkg/controllers/proxyset/resource.go +++ b/pkg/controllers/proxyset/resource.go @@ -58,8 +58,11 @@ cat < ${bc} uuid = "${UUID}" EOF # build instance config -sed "/\[proxy\]/r ${bc}" {{ .ConfigFilePath }} > ${conf} - +if [ -n "${CONFIG_SUFFIX}" ]; then + sed "/\[proxy\]/r ${bc}" "{{ .ConfigFilePath }}-${CONFIG_SUFFIX}" > ${conf} +else + sed "/\[proxy\]/r ${bc}" "{{ .ConfigFilePath }}" > ${conf} +fi echo "/mo-service -cfg ${conf} $@" exec /mo-service -cfg ${conf} $@ `)) @@ -69,7 +72,7 @@ func buildCloneSet(proxy *v1alpha1.ProxySet) *kruisev1alpha1.CloneSet { } func syncCloneSet(ctx *recon.Context[*v1alpha1.ProxySet], proxy *v1alpha1.ProxySet, cs *kruisev1alpha1.CloneSet) error { - cm, err := buildProxyConfigMap(proxy, ctx.Dep.Deps.LogSet) + cm, configSuffix, err := buildProxyConfigMap(proxy, ctx.Dep.Deps.LogSet) if err != nil { return errors.WrapPrefix(err, "build configmap", 0) } @@ -81,6 +84,7 @@ func syncCloneSet(ctx *recon.Context[*v1alpha1.ProxySet], proxy *v1alpha1.ProxyS ConfigMap: cm, KubeCli: ctx, StorageProvider: &ctx.Dep.Deps.LogSet.Spec.SharedStorage, + ConfigSuffix: configSuffix, MutateContainer: syncMainContainer, }) } @@ -145,9 +149,9 @@ func syncSvc(proxy *v1alpha1.ProxySet, svc *corev1.Service) { } } -func buildProxyConfigMap(proxy *v1alpha1.ProxySet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) { +func buildProxyConfigMap(proxy *v1alpha1.ProxySet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, string, error) { if ls.Status.Discovery == nil { - return nil, errors.New("HAKeeper discovery address not ready") + return nil, "", errors.New("HAKeeper discovery address not ready") } conf := proxy.Spec.Config if conf == nil { @@ -162,7 +166,7 @@ func buildProxyConfigMap(proxy *v1alpha1.ProxySet, ls *v1alpha1.LogSet) (*corev1 } s, err := conf.ToString() if err != nil { - return nil, err + return nil, "", err } buff := new(bytes.Buffer) @@ -170,16 +174,17 @@ func buildProxyConfigMap(proxy *v1alpha1.ProxySet, ls *v1alpha1.LogSet) (*corev1 ConfigFilePath: fmt.Sprintf("%s/%s", common.ConfigPath, common.ConfigFile), }) if err != nil { - return nil, err + return nil, "", err } + configSuffix := common.DataDigest([]byte(s)) return &corev1.ConfigMap{ ObjectMeta: configMapKey(proxy), Data: map[string]string{ - common.ConfigFile: s, + fmt.Sprintf("%s-%s", common.ConfigFile, configSuffix): s, common.Entrypoint: buff.String(), }, - }, nil + }, configSuffix, nil } func configMapKey(p *v1alpha1.ProxySet) metav1.ObjectMeta { diff --git a/pkg/controllers/webui/controller.go b/pkg/controllers/webui/controller.go index 9f79815c..e692d443 100644 --- a/pkg/controllers/webui/controller.go +++ b/pkg/controllers/webui/controller.go @@ -160,7 +160,7 @@ func (w *Actor) Create(ctx *recon.Context[*v1alpha1.WebUI]) error { return err } - if err := common.SyncConfigMap(ctx, &wiObj.Spec.Template.Spec, configMap); err != nil { + if err := common.SyncConfigMap(ctx, &wiObj.Spec.Template.Spec, configMap, wi.Spec.GetOperatorVersion()); err != nil { return err } diff --git a/pkg/controllers/webui/resource.go b/pkg/controllers/webui/resource.go index a9f9759e..412f213a 100644 --- a/pkg/controllers/webui/resource.go +++ b/pkg/controllers/webui/resource.go @@ -135,7 +135,7 @@ func syncPods(ctx *recon.Context[*v1alpha1.WebUI], dp *appsv1.Deployment) error syncPodMeta(ctx.Obj, dp) syncPodSpec(ctx.Obj, dp) - return common.SyncConfigMap(ctx, &dp.Spec.Template.Spec, cm) + return common.SyncConfigMap(ctx, &dp.Spec.Template.Spec, cm, ctx.Obj.Spec.GetOperatorVersion()) } func syncServiceType(wi *v1alpha1.WebUI, svc *corev1.Service) { diff --git a/pkg/webhook/cnset_webhook.go b/pkg/webhook/cnset_webhook.go index 54eb8882..a3a4e446 100644 --- a/pkg/webhook/cnset_webhook.go +++ b/pkg/webhook/cnset_webhook.go @@ -54,7 +54,7 @@ type cnSetDefaulter struct{} var _ webhook.CustomDefaulter = &cnSetDefaulter{} -func (c *cnSetDefaulter) Default(_ context.Context, obj runtime.Object) error { +func (c *cnSetDefaulter) Default(ctx context.Context, obj runtime.Object) error { cnSet, ok := obj.(*v1alpha1.CNSet) if !ok { return unexpectedKindError("CNSet", obj) @@ -64,7 +64,7 @@ func (c *cnSetDefaulter) Default(_ context.Context, obj runtime.Object) error { if cnSet.Spec.Role == "" { cnSet.Spec.Role = v1alpha1.CNRoleTP } - return nil + return setDefaultOperatorVersion(ctx, &cnSet.Spec.PodSet) } func (c *cnSetDefaulter) DefaultSpec(spec *v1alpha1.CNSetSpec) { diff --git a/pkg/webhook/dnset_webhook.go b/pkg/webhook/dnset_webhook.go index 9be19259..eea9a2f1 100644 --- a/pkg/webhook/dnset_webhook.go +++ b/pkg/webhook/dnset_webhook.go @@ -43,13 +43,13 @@ type dnSetDefaulter struct{} var _ webhook.CustomDefaulter = &dnSetDefaulter{} -func (d *dnSetDefaulter) Default(_ context.Context, obj runtime.Object) error { +func (d *dnSetDefaulter) Default(ctx context.Context, obj runtime.Object) error { dnSet, ok := obj.(*v1alpha1.DNSet) if !ok { return unexpectedKindError("DNSet", obj) } d.DefaultSpec(&dnSet.Spec) - return nil + return setDefaultOperatorVersion(ctx, &dnSet.Spec.PodSet) } func (d *dnSetDefaulter) DefaultSpec(spec *v1alpha1.DNSetSpec) { diff --git a/pkg/webhook/logset_webhook.go b/pkg/webhook/logset_webhook.go index c2566fbf..ddcfb23d 100644 --- a/pkg/webhook/logset_webhook.go +++ b/pkg/webhook/logset_webhook.go @@ -61,13 +61,13 @@ type logSetDefaulter struct{} var _ webhook.CustomDefaulter = &logSetDefaulter{} -func (l *logSetDefaulter) Default(_ context.Context, obj runtime.Object) error { +func (l *logSetDefaulter) Default(ctx context.Context, obj runtime.Object) error { logSet, ok := obj.(*v1alpha1.LogSet) if !ok { return unexpectedKindError("LogSet", obj) } l.DefaultSpec(&logSet.Spec) - return nil + return setDefaultOperatorVersion(ctx, &logSet.Spec.PodSet) } func (l *logSetDefaulter) DefaultSpec(spec *v1alpha1.LogSetSpec) { diff --git a/pkg/webhook/matrixonecluster_webhook.go b/pkg/webhook/matrixonecluster_webhook.go index 5668f026..60a175fe 100644 --- a/pkg/webhook/matrixonecluster_webhook.go +++ b/pkg/webhook/matrixonecluster_webhook.go @@ -17,6 +17,8 @@ package webhook import ( "context" "fmt" + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/utils/pointer" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation" @@ -68,7 +70,7 @@ type matrixOneClusterDefaulter struct { var _ webhook.CustomDefaulter = &matrixOneClusterDefaulter{} -func (m *matrixOneClusterDefaulter) Default(_ context.Context, obj runtime.Object) error { +func (m *matrixOneClusterDefaulter) Default(ctx context.Context, obj runtime.Object) error { moc, ok := obj.(*v1alpha1.MatrixOneCluster) if !ok { return unexpectedKindError("MatrixOneCluster", obj) @@ -89,6 +91,13 @@ func (m *matrixOneClusterDefaulter) Default(_ context.Context, obj runtime.Objec for i := range moc.Spec.CNGroups { m.cn.DefaultSpec(&moc.Spec.CNGroups[i].CNSetSpec) } + req, err := admission.RequestFromContext(ctx) + if err != nil { + return err + } + if req.AdmissionRequest.Operation == admissionv1.Create && moc.Spec.OperatorVersion == nil { + moc.Spec.OperatorVersion = pointer.String(v1alpha1.LatestOpVersion.String()) + } return nil } diff --git a/pkg/webhook/proxy_webhook.go b/pkg/webhook/proxy_webhook.go index 9fb94adf..de2d55b4 100644 --- a/pkg/webhook/proxy_webhook.go +++ b/pkg/webhook/proxy_webhook.go @@ -16,7 +16,6 @@ package webhook import ( "context" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -42,13 +41,13 @@ type proxySetDefaulter struct{} var _ webhook.CustomDefaulter = &proxySetDefaulter{} -func (p *proxySetDefaulter) Default(_ context.Context, obj runtime.Object) error { +func (p *proxySetDefaulter) Default(ctx context.Context, obj runtime.Object) error { proxySet, ok := obj.(*v1alpha1.ProxySet) if !ok { return unexpectedKindError("ProxySet", obj) } p.DefaultSpec(&proxySet.Spec) - return nil + return setDefaultOperatorVersion(ctx, &proxySet.Spec.PodSet) } func (p *proxySetDefaulter) DefaultSpec(spec *v1alpha1.ProxySetSpec) { diff --git a/pkg/webhook/utils.go b/pkg/webhook/utils.go index eedc2e47..ddd969ed 100644 --- a/pkg/webhook/utils.go +++ b/pkg/webhook/utils.go @@ -15,13 +15,18 @@ package webhook import ( + "context" "fmt" + "github.com/go-errors/errors" - "github.com/matrixorigin/matrixone-operator/api/core/v1alpha1" + admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "github.com/matrixorigin/matrixone-operator/api/core/v1alpha1" ) // DefaultArgs alias to v1alpha1.DefaultArgs @@ -64,7 +69,6 @@ func setPodSetDefaults(s *v1alpha1.PodSet) { s.Overlay.Env = appendIfNotExist(s.Overlay.Env, corev1.EnvVar{Name: v1alpha1.EnvGoDebug, Value: v1alpha1.DefaultGODebug}, func(v corev1.EnvVar) string { return v.Name }) - if s.ExportToPrometheus != nil && *s.ExportToPrometheus { if s.PromDiscoveryScheme == nil { s.PromDiscoveryScheme = (*v1alpha1.PromDiscoveryScheme)(pointer.String(string(v1alpha1.PromDiscoverySchemeService))) @@ -90,3 +94,14 @@ func defaultDiskCacheSize(total *resource.Quantity) *resource.Quantity { func unexpectedKindError(expected string, obj runtime.Object) error { return errors.Errorf("expected %s but received %T", expected, obj) } + +func setDefaultOperatorVersion(ctx context.Context, podSet *v1alpha1.PodSet) error { + req, err := admission.RequestFromContext(ctx) + if err != nil { + return err + } + if req.AdmissionRequest.Operation == admissionv1.Create && podSet.OperatorVersion == nil { + podSet.OperatorVersion = pointer.String(v1alpha1.LatestOpVersion.String()) + } + return nil +} diff --git a/test/e2e/claim_test.go b/test/e2e/claim_test.go index bf857b0d..ff4b33fd 100644 --- a/test/e2e/claim_test.go +++ b/test/e2e/claim_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "time" @@ -116,6 +117,7 @@ var _ = Describe("CNClaim and CNPool test", func() { MainContainer: v1alpha1.MainContainer{ Image: fmt.Sprintf("%s:%s", moImageRepo, moVersion), }, + OperatorVersion: pointer.String(v1alpha1.LatestOpVersion.String()), }, }, Deps: v1alpha1.CNSetDeps{ diff --git a/test/e2e/logset_test.go b/test/e2e/logset_test.go index 05fe4e62..ca2bb250 100644 --- a/test/e2e/logset_test.go +++ b/test/e2e/logset_test.go @@ -156,6 +156,122 @@ var _ = Describe("MatrixOneCluster test", func() { }, teardownClusterTimeout, pollInterval).Should(Succeed()) }) + It("Should restart log pod inplace when only config changed", func() { + By("Create logset") + pull := corev1.PullIfNotPresent + l := &v1alpha1.LogSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: env.Namespace, + Name: "log-" + rand.String(6), + }, + Spec: v1alpha1.LogSetSpec{ + PodSet: v1alpha1.PodSet{ + Replicas: 3, + MainContainer: v1alpha1.MainContainer{ + Image: fmt.Sprintf("%s:%s", moImageRepo, moVersion), + }, + Overlay: &v1alpha1.Overlay{ + MainContainerOverlay: v1alpha1.MainContainerOverlay{ + ImagePullPolicy: &pull, + }, + }, + }, + Volume: v1alpha1.Volume{ + Size: resource.MustParse("100Mi"), + }, + SharedStorage: v1alpha1.SharedStorageProvider{ + FileSystem: &v1alpha1.FileSystemProvider{ + Path: "/test", + }, + }, + StoreFailureTimeout: &metav1.Duration{Duration: 2 * time.Minute}, + }, + } + Expect(kubeCli.Create(ctx, l)).To(Succeed()) + Eventually(func() error { + if err := kubeCli.Get(ctx, client.ObjectKeyFromObject(l), l); err != nil { + logger.Errorw("error get logset status", "logset", l.Name, "error", err) + return err + } + if !recon.IsReady(&l.Status.ConditionalStatus) { + logger.Infow("wait logset ready", "logset", l.Name) + return errWait + } + return nil + }, createLogSetTimeout, pollInterval).Should(Succeed()) + + By("Logset update config") + oldPodList := &corev1.PodList{} + Expect(kubeCli.List(ctx, oldPodList, client.MatchingLabels(map[string]string{common.InstanceLabelKey: l.Name}))).To(Succeed()) + oldPodNameToUID := map[string]types.UID{} + var oldSuffix string + for _, pod := range oldPodList.Items { + oldSuffix = pod.Annotations[common.ConfigSuffixAnno] + oldPodNameToUID[pod.Name] = pod.UID + } + newConfig := l.Spec.Config.DeepCopy() + if newConfig == nil { + newConfig = v1alpha1.NewTomlConfig(map[string]interface{}{}) + } + Expect(e2eutil.Patch(ctx, kubeCli, l, func() error { + newConfig.Set([]string{"observability.labelSelector", "role"}, "__MO_UPDATE_TESTER__") + l.Spec.Config = newConfig + return nil + })).To(Succeed()) + Eventually(func() error { + podList := &corev1.PodList{} + clone := l.DeepCopy() + if err := kubeCli.Get(ctx, client.ObjectKeyFromObject(l), clone); err != nil { + logger.Errorw("error get logset status", "logset", l.Name, "error", err) + } + if !recon.IsReady(&clone.Status.ConditionalStatus) { + logger.Infow("wait logset ready", "logset", l.Name) + return errWait + } + if err := kubeCli.List(ctx, podList, client.MatchingLabels(map[string]string{common.InstanceLabelKey: l.Name})); err != nil { + logger.Errorw("error list pods", "logset", l.Name, "error", err) + return err + } + // expect pods not change, but restarted + for _, pod := range podList.Items { + if uid, ok := oldPodNameToUID[pod.Name]; ok && uid == pod.UID && + oldSuffix != pod.Annotations[common.ConfigSuffixAnno] { + continue + } + logger.Infow("waiting pod update", "pod", pod.Name) + return errWait + } + return nil + }, createLogSetTimeout, pollInterval).Should(Succeed()) + + By("Teardown logset") + Expect(kubeCli.Delete(ctx, l)).To(Succeed()) + Eventually(func() error { + err := kubeCli.Get(ctx, client.ObjectKeyFromObject(l), l) + if err == nil { + logger.Infow("wait logset teardown", "logset", l.Name) + return errWait + } + if !apierrors.IsNotFound(err) { + logger.Errorw("unexpected error when get logset", "logset", l, "error", err) + return err + } + podList := &corev1.PodList{} + err = kubeCli.List(ctx, podList, client.InNamespace(l.Namespace)) + if err != nil { + logger.Errorw("error list pods", "error", err) + return err + } + for _, pod := range podList.Items { + if strings.HasPrefix(pod.Name, l.Name) { + logger.Infow("Pod that belongs to the logset is not cleaned", "pod", pod.Name) + return errWait + } + } + return nil + }, teardownClusterTimeout, pollInterval).Should(Succeed()) + }) + It("Should start logset service successfully when logset replicas is 1", func() { By("Create logset") l := &v1alpha1.LogSet{