diff --git a/Dockerfile b/Dockerfile index c40737fe..a45217ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ COPY internal/k8s internal/k8s COPY internal/temporal internal/temporal COPY internal/controller internal/controller COPY internal/planner internal/planner +COPY internal/defaults internal/defaults # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command diff --git a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml index ebd661d1..b3778211 100644 --- a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml @@ -76,6 +76,10 @@ spec: required: - strategy type: object + maxVersions: + format: int32 + minimum: 1 + type: integer minReadySeconds: format: int32 type: integer @@ -3925,6 +3929,9 @@ spec: versionConflictToken: format: byte type: string + versionCount: + format: int32 + type: integer required: - targetVersion type: object diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index cd83b383..5280e6df 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -77,7 +77,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Check if we need to force manual strategy due to external modification rolloutStrategy := w.Spec.RolloutStrategy - if w.Status.LastModifierIdentity != controllerIdentity { + if w.Status.LastModifierIdentity != controllerIdentity && w.Status.LastModifierIdentity != "" { l.Info("Forcing manual rollout strategy since deployment was modified externally") rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual } diff --git a/internal/controller/genstatus.go b/internal/controller/genstatus.go index 0473ccc3..9f449a99 100644 --- a/internal/controller/genstatus.go +++ b/internal/controller/genstatus.go @@ -50,6 +50,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus( workerDeploymentName, targetVersionID, workerDeploy, + temporalState, ) if err != nil { l.Error(err, "error getting test workflow status") diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 0bebdeb0..ecbbc914 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -39,6 +39,8 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork status.TargetVersion = m.mapTargetWorkerDeploymentVersion(targetVersionID) if m.temporalState.RampingVersionID == targetVersionID { status.TargetVersion.RampingSince = m.temporalState.RampingSince + // TODO(Shivam): Temporal server is not emitting the right value for RampLastModifiedAt. + // This is going to be fixed by https://github.com/temporalio/temporal/pull/8089. status.TargetVersion.RampLastModifiedAt = m.temporalState.RampLastModifiedAt rampPercentage := m.temporalState.RampPercentage status.TargetVersion.RampPercentage = &rampPercentage diff --git a/internal/demo/Dockerfile b/internal/demo/Dockerfile index 5de4eaac..03f0b6a8 100644 --- a/internal/demo/Dockerfile +++ b/internal/demo/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.23 as builder +FROM golang:1.24 as builder ARG TARGETOS ARG TARGETARCH ARG WORKER diff --git a/internal/demo/helloworld/temporal_worker_deployment.yaml b/internal/demo/helloworld/temporal_worker_deployment.yaml index 0ad86729..264862bf 100644 --- a/internal/demo/helloworld/temporal_worker_deployment.yaml +++ b/internal/demo/helloworld/temporal_worker_deployment.yaml @@ -17,14 +17,14 @@ spec: strategy: Progressive steps: - rampPercentage: 1 - pauseDuration: 5s + pauseDuration: 30s - rampPercentage: 5 - pauseDuration: 5s + pauseDuration: 30s - rampPercentage: 10 - pauseDuration: 5s + pauseDuration: 30s # Increase traffic to 50% and wait 1 minute - rampPercentage: 50 - pauseDuration: 1m + pauseDuration: 30s gate: workflowType: "HelloWorld" sunset: diff --git a/internal/k8s/deployments.go b/internal/k8s/deployments.go index c286da5b..3fb55cb4 100644 --- a/internal/k8s/deployments.go +++ b/internal/k8s/deployments.go @@ -7,14 +7,15 @@ package k8s import ( "context" "fmt" + "regexp" + "sort" + "strings" + "github.com/distribution/reference" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "regexp" "sigs.k8s.io/controller-runtime/pkg/client" - "sort" - "strings" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils" diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 51a42b04..6aa7ebe4 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -240,7 +240,7 @@ func getTestWorkflows( if _, ok := taskQueuesWithWorkflows[tq.Name]; !ok { testWorkflows = append(testWorkflows, WorkflowConfig{ WorkflowType: config.RolloutStrategy.Gate.WorkflowType, - WorkflowID: getTestWorkflowID(tq.Name, targetVersion.VersionID), + WorkflowID: temporal.GetTestWorkflowID(targetVersion.VersionID, tq.Name), VersionID: targetVersion.VersionID, TaskQueue: tq.Name, }) @@ -250,11 +250,6 @@ func getTestWorkflows( return testWorkflows } -// getTestWorkflowID generates an ID for a test workflow -func getTestWorkflowID(taskQueue, versionID string) string { - return "test-" + versionID + "-" + taskQueue -} - // getVersionConfigDiff determines the version configuration based on the rollout strategy func getVersionConfigDiff( l logr.Logger, @@ -360,13 +355,15 @@ func handleProgressiveRollout( } // Move to the next step if it has been long enough since the last update - if rampLastModifiedAt.Add(currentStep.PauseDuration.Duration).Before(currentTime) { - if i < len(steps)-1 { - vcfg.RampPercentage = steps[i+1].RampPercentage - return vcfg - } else { - vcfg.SetCurrent = true - return vcfg + if rampLastModifiedAt != nil { + if rampLastModifiedAt.Add(currentStep.PauseDuration.Duration).Before(currentTime) { + if i < len(steps)-1 { + vcfg.RampPercentage = steps[i+1].RampPercentage + return vcfg + } else { + vcfg.SetCurrent = true + return vcfg + } } } diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index aaf85d2e..295c5761 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -1163,6 +1163,40 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { expectRampPercent: 0, expectSetCurrent: true, }, + "nil rampLastModifiedAt should not cause a panic": { + strategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + { + RampPercentage: 10, + PauseDuration: metav1.Duration{Duration: 30 * time.Second}, + }, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.123", + Status: temporaliov1alpha1.VersionStatusCurrent, + }, + }, + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.456", + Status: temporaliov1alpha1.VersionStatusRamping, + HealthySince: &metav1.Time{Time: time.Now()}, + }, + RampingSince: &metav1.Time{ + Time: time.Now().Add(-2*time.Hour - 1*time.Second), + }, + RampPercentage: float32Ptr(10), + RampLastModifiedAt: nil, // nil rampLastModifiedAt should not cause a panic! + }, + }, + expectConfig: false, + expectRampPercent: 0, + expectSetCurrent: false, + }, } for name, tc := range testCases { @@ -1677,7 +1711,7 @@ func TestGetTestWorkflowID(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - id := getTestWorkflowID(tc.taskQueue, tc.versionID) + id := temporal.GetTestWorkflowID(tc.versionID, tc.taskQueue) assert.Equal(t, tc.expectID, id, "unexpected workflow ID") }) } diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 247c6296..7556c747 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -82,7 +82,7 @@ func GetWorkerDeploymentState( if routingConfig.RampingVersion != "" { var ( rampingSinceTime = metav1.NewTime(routingConfig.RampingVersionChangedTime) - lastRampUpdateTime = metav1.NewTime(workerDeploymentInfo.RoutingConfig.RampingVersionPercentageChangedTime) + lastRampUpdateTime = metav1.NewTime(routingConfig.RampingVersionPercentageChangedTime) ) state.RampingSince = &rampingSinceTime state.RampLastModifiedAt = &lastRampUpdateTime @@ -130,6 +130,7 @@ func GetTestWorkflowStatus( workerDeploymentName string, versionID string, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + temporalState *TemporalWorkerState, ) ([]temporaliov1alpha1.WorkflowExecution, error) { var results []temporaliov1alpha1.WorkflowExecution @@ -154,8 +155,13 @@ func GetTestWorkflowStatus( continue } + // Adding task queue information to the current temporal state + temporalState.Versions[versionID].TaskQueues = append(temporalState.Versions[versionID].TaskQueues, temporaliov1alpha1.TaskQueue{ + Name: tq.Name, + }) + // Check if there is a test workflow for this task queue - testWorkflowID := getTestWorkflowID(workerDeploymentName, tq.Name, versionID) + testWorkflowID := GetTestWorkflowID(versionID, tq.Name) wf, err := client.DescribeWorkflowExecution( ctx, testWorkflowID, @@ -206,7 +212,7 @@ func mapWorkflowStatus(status enums.WorkflowExecutionStatus) temporaliov1alpha1. } } -// getTestWorkflowID generates a consistent ID for test workflows -func getTestWorkflowID(deploymentName, taskQueue, versionID string) string { - return fmt.Sprintf("test-%s-%s-%s", deploymentName, taskQueue, versionID) +// GetTestWorkflowID generates a workflowID for test workflows +func GetTestWorkflowID(versionID, taskQueue string) string { + return fmt.Sprintf("test-%s-%s", versionID, taskQueue) } diff --git a/internal/temporal/worker_deployment_test.go b/internal/temporal/worker_deployment_test.go index 407fb937..c55803d1 100644 --- a/internal/temporal/worker_deployment_test.go +++ b/internal/temporal/worker_deployment_test.go @@ -82,20 +82,20 @@ func TestGetTestWorkflowID(t *testing.T) { deploymentName: "worker", taskQueue: "queue1", versionID: "worker.v1", - expected: "test-worker-queue1-worker.v1", + expected: "test-worker.v1-queue1", }, { name: "with dots", deploymentName: "worker.app", taskQueue: "queue.main", versionID: "worker.app.v2", - expected: "test-worker.app-queue.main-worker.app.v2", + expected: "test-worker.app.v2-queue.main", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - id := getTestWorkflowID(tt.deploymentName, tt.taskQueue, tt.versionID) + id := GetTestWorkflowID(tt.versionID, tt.taskQueue) assert.Equal(t, tt.expected, id) }) }