From 21fee2317fa1d7e889cc4352fa9f13f4a2668f06 Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Tue, 25 Jun 2024 10:15:00 +0800 Subject: [PATCH] skip job check run status in workflow's deploy job Signed-off-by: Patrick Zhao --- .../aslan/core/common/service/kube/render.go | 4 +++ .../core/common/service/kube/workloads.go | 15 ++++++++--- .../jobcontroller/job_deploy.go | 25 +++++++++++++++++-- pkg/microservice/aslan/core/vm/service/log.go | 8 +++--- pkg/tool/kube/updater/job.go | 15 +++++++++++ 5 files changed, 58 insertions(+), 9 deletions(-) diff --git a/pkg/microservice/aslan/core/common/service/kube/render.go b/pkg/microservice/aslan/core/common/service/kube/render.go index c42043cc87..27d4d61db2 100644 --- a/pkg/microservice/aslan/core/common/service/kube/render.go +++ b/pkg/microservice/aslan/core/common/service/kube/render.go @@ -183,6 +183,10 @@ func ReplaceWorkloadImages(rawYaml string, images []*commonmodels.Container) (st if err := decoder.Decode(job); err != nil { return "", nil, fmt.Errorf("unmarshal Job error: %v", err) } + workloadRes = append(workloadRes, &WorkloadResource{ + Name: resKind.Metadata.Name, + Type: resKind.Kind, + }) for i, container := range job.Spec.Template.Spec.Containers { containerName := container.Name if image, ok := imageMap[containerName]; ok { diff --git a/pkg/microservice/aslan/core/common/service/kube/workloads.go b/pkg/microservice/aslan/core/common/service/kube/workloads.go index 3922959855..88ec84e3ab 100644 --- a/pkg/microservice/aslan/core/common/service/kube/workloads.go +++ b/pkg/microservice/aslan/core/common/service/kube/workloads.go @@ -30,15 +30,16 @@ import ( ) func FetchSelectedWorkloads(namespace string, Resource []*WorkloadResource, kubeclient crClient.Client, clientSet *kubernetes.Clientset) ([]*appsv1.Deployment, []*appsv1.StatefulSet, - []*batchv1.CronJob, []*batchv1beta1.CronJob, error) { + []*batchv1.CronJob, []*batchv1beta1.CronJob, []*batchv1.Job, error) { var deployments []*appsv1.Deployment var statefulSets []*appsv1.StatefulSet var cronJobs []*batchv1.CronJob var betaCronJobs []*batchv1beta1.CronJob + var jobs []*batchv1.Job k8sServerVersion, err := clientSet.Discovery().ServerVersion() if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } for _, item := range Resource { @@ -70,7 +71,15 @@ func FetchSelectedWorkloads(namespace string, Resource []*WorkloadResource, kube if cronjobBeta != nil && cronjobExists { betaCronJobs = append(betaCronJobs, cronjobBeta) } + case setting.Job: + job, jobExists, err := getter.GetJob(namespace, item.Name, kubeclient) + if jobExists && err == nil { + jobs = append(jobs, job) + } + if err != nil { + log.Errorf("failed to fetch job %s, error: %v", item.Name, err) + } } } - return deployments, statefulSets, cronJobs, betaCronJobs, nil + return deployments, statefulSets, cronJobs, betaCronJobs, jobs, nil } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go index 5a97b28b3c..a677692777 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_deploy.go @@ -303,7 +303,7 @@ func (c *DeployJobCtl) updateSystemService(env *commonmodels.Product, currentYam c.jobTaskSpec.RelatedPodLabels = append(c.jobTaskSpec.RelatedPodLabels, podLabels) } c.jobTaskSpec.ReplaceResources = append(c.jobTaskSpec.ReplaceResources, commonmodels.Resource{Name: us.GetName(), Kind: us.GetKind()}) - case setting.CronJob: + case setting.CronJob, setting.Job: c.jobTaskSpec.ReplaceResources = append(c.jobTaskSpec.ReplaceResources, commonmodels.Resource{Name: us.GetName(), Kind: us.GetKind()}) } } @@ -314,7 +314,7 @@ func (c *DeployJobCtl) updateExternalServiceModule(ctx context.Context, resource var err error var replaced bool - deployments, statefulSets, cronJobs, betaCronJobs, err := kube.FetchSelectedWorkloads(env.Namespace, resources, c.kubeClient, c.clientSet) + deployments, statefulSets, cronJobs, betaCronJobs, jobs, err := kube.FetchSelectedWorkloads(env.Namespace, resources, c.kubeClient, c.clientSet) if err != nil { return err } @@ -399,6 +399,27 @@ BetaCronLoop: } } } +Job: + for _, job := range jobs { + for _, container := range job.Spec.Template.Spec.Containers { + if container.Name == serviceModule.ServiceModule { + return fmt.Errorf("job %s/%s/%s is not supported to update image", env.Namespace, job.Name, container.Name) + // err = updater.UpdateJobImage(job.Namespace, job.Name, serviceModule.ServiceModule, serviceModule.Image, c.kubeClient) + // if err != nil { + // return fmt.Errorf("failed to update container image in %s/job/%s/%s: %v", env.Namespace, job.Name, container.Name, err) + // } + // c.jobTaskSpec.ReplaceResources = append(c.jobTaskSpec.ReplaceResources, commonmodels.Resource{ + // Kind: setting.Job, + // Container: container.Name, + // Origin: container.Image, + // Name: job.Name, + // }) + // replaced = true + // c.jobTaskSpec.RelatedPodLabels = append(c.jobTaskSpec.RelatedPodLabels, job.Spec.Template.Labels) + break Job + } + } + } if !replaced { return fmt.Errorf("service %s container name %s is not found in env %s", c.jobTaskSpec.ServiceName, serviceModule.ServiceModule, c.jobTaskSpec.Env) diff --git a/pkg/microservice/aslan/core/vm/service/log.go b/pkg/microservice/aslan/core/vm/service/log.go index 87b682ec78..73be9a5b1f 100644 --- a/pkg/microservice/aslan/core/vm/service/log.go +++ b/pkg/microservice/aslan/core/vm/service/log.go @@ -65,13 +65,13 @@ func (v *VMJobStatusMap) Delete(key string) { cache.NewRedisCache(utilconfig.RedisCommonCacheTokenDB()).Delete(vmJobKey(key)) } -func savaVMJobLog(job *vmmodel.VMJob, log string, logger *zap.SugaredLogger) (err error) { +func savaVMJobLog(job *vmmodel.VMJob, logContent string, logger *zap.SugaredLogger) (err error) { if job.Status == string(config.StatusRunning) { VMJobStatus.Set(job.ID.Hex()) } var file string - if job != nil && job.LogFile == "" && log != "" { + if job != nil && job.LogFile == "" && logContent != "" { file, err = util.CreateVMJobLogFile(job.ID.Hex()) if err != nil { return fmt.Errorf("failed to generate tmp file, error: %s", err) @@ -81,8 +81,8 @@ func savaVMJobLog(job *vmmodel.VMJob, log string, logger *zap.SugaredLogger) (er file = job.LogFile } - if log != "" { - err = util.WriteFile(file, []byte(log), 0644) + if logContent != "" { + err = util.WriteFile(file, []byte(logContent), 0644) if err != nil { return fmt.Errorf("failed to write log to file, error: %s", err) } diff --git a/pkg/tool/kube/updater/job.go b/pkg/tool/kube/updater/job.go index 08d8ce4e5f..45910626f6 100644 --- a/pkg/tool/kube/updater/job.go +++ b/pkg/tool/kube/updater/job.go @@ -18,6 +18,7 @@ package updater import ( "context" + "fmt" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -74,3 +75,17 @@ func DeleteJobsAndWait(ns string, selector labels.Selector, cl client.Client) er } return deleteObjectsAndWait(ns, selector, &batchv1.Job{}, gvk, cl) } + +func PatchJob(ns, name string, patchBytes []byte, cl client.Client) error { + return patchObject(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + }, patchBytes, cl) +} + +func UpdateJobImage(ns, name, container, image string, cl client.Client) error { + patchBytes := []byte(fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, container, image)) + return PatchJob(ns, name, patchBytes, cl) +}