Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY internal/k8s internal/k8s
COPY internal/temporal internal/temporal
COPY internal/controller internal/controller
COPY internal/planner internal/planner
COPY internal/defaults internal/defaults

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ spec:
required:
- strategy
type: object
maxVersions:
format: int32
minimum: 1
type: integer
minReadySeconds:
format: int32
type: integer
Expand Down Expand Up @@ -3925,6 +3929,9 @@ spec:
versionConflictToken:
format: byte
type: string
versionCount:
format: int32
type: integer
required:
- targetVersion
type: object
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(

// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != controllerIdentity {
if w.Status.LastModifierIdentity != controllerIdentity && w.Status.LastModifierIdentity != "" {
l.Info("Forcing manual rollout strategy since deployment was modified externally")
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
workerDeploymentName,
targetVersionID,
workerDeploy,
temporalState,
)
if err != nil {
l.Error(err, "error getting test workflow status")
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork
status.TargetVersion = m.mapTargetWorkerDeploymentVersion(targetVersionID)
if m.temporalState.RampingVersionID == targetVersionID {
status.TargetVersion.RampingSince = m.temporalState.RampingSince
// TODO(Shivam): Temporal server is not emitting the right value for RampLastModifiedAt.
// This is going to be fixed by https://github.com/temporalio/temporal/pull/8089.
status.TargetVersion.RampLastModifiedAt = m.temporalState.RampLastModifiedAt
rampPercentage := m.temporalState.RampPercentage
status.TargetVersion.RampPercentage = &rampPercentage
Expand Down
2 changes: 1 addition & 1 deletion internal/demo/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.23 as builder
FROM golang:1.24 as builder
ARG TARGETOS
ARG TARGETARCH
ARG WORKER
Expand Down
8 changes: 4 additions & 4 deletions internal/demo/helloworld/temporal_worker_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ spec:
strategy: Progressive
steps:
- rampPercentage: 1
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 5
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 10
pauseDuration: 5s
pauseDuration: 30s
# Increase traffic to 50% and wait 1 minute
- rampPercentage: 50
pauseDuration: 1m
pauseDuration: 30s
gate:
workflowType: "HelloWorld"
sunset:
Expand Down
7 changes: 4 additions & 3 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ package k8s
import (
"context"
"fmt"
"regexp"
"sort"
"strings"

"github.com/distribution/reference"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/client"
"sort"
"strings"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils"
Expand Down
23 changes: 10 additions & 13 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func getTestWorkflows(
if _, ok := taskQueuesWithWorkflows[tq.Name]; !ok {
testWorkflows = append(testWorkflows, WorkflowConfig{
WorkflowType: config.RolloutStrategy.Gate.WorkflowType,
WorkflowID: getTestWorkflowID(tq.Name, targetVersion.VersionID),
WorkflowID: temporal.GetTestWorkflowID(targetVersion.VersionID, tq.Name),
VersionID: targetVersion.VersionID,
TaskQueue: tq.Name,
})
Expand All @@ -250,11 +250,6 @@ func getTestWorkflows(
return testWorkflows
}

// getTestWorkflowID generates an ID for a test workflow
func getTestWorkflowID(taskQueue, versionID string) string {
return "test-" + versionID + "-" + taskQueue
}

// getVersionConfigDiff determines the version configuration based on the rollout strategy
func getVersionConfigDiff(
l logr.Logger,
Expand Down Expand Up @@ -360,13 +355,15 @@ func handleProgressiveRollout(
}

// Move to the next step if it has been long enough since the last update
if rampLastModifiedAt.Add(currentStep.PauseDuration.Duration).Before(currentTime) {
if i < len(steps)-1 {
vcfg.RampPercentage = steps[i+1].RampPercentage
return vcfg
} else {
vcfg.SetCurrent = true
return vcfg
if rampLastModifiedAt != nil {
if rampLastModifiedAt.Add(currentStep.PauseDuration.Duration).Before(currentTime) {
if i < len(steps)-1 {
vcfg.RampPercentage = steps[i+1].RampPercentage
return vcfg
} else {
vcfg.SetCurrent = true
return vcfg
}
}
}

Expand Down
36 changes: 35 additions & 1 deletion internal/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,40 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) {
expectRampPercent: 0,
expectSetCurrent: true,
},
"nil rampLastModifiedAt should not cause a panic": {
strategy: temporaliov1alpha1.RolloutStrategy{
Strategy: temporaliov1alpha1.UpdateProgressive,
Steps: []temporaliov1alpha1.RolloutStep{
{
RampPercentage: 10,
PauseDuration: metav1.Duration{Duration: 30 * time.Second},
},
},
},
status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
VersionID: "test/namespace.123",
Status: temporaliov1alpha1.VersionStatusCurrent,
},
},
TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
VersionID: "test/namespace.456",
Status: temporaliov1alpha1.VersionStatusRamping,
HealthySince: &metav1.Time{Time: time.Now()},
},
RampingSince: &metav1.Time{
Time: time.Now().Add(-2*time.Hour - 1*time.Second),
},
RampPercentage: float32Ptr(10),
RampLastModifiedAt: nil, // nil rampLastModifiedAt should not cause a panic!
},
},
expectConfig: false,
expectRampPercent: 0,
expectSetCurrent: false,
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -1677,7 +1711,7 @@ func TestGetTestWorkflowID(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
id := getTestWorkflowID(tc.taskQueue, tc.versionID)
id := temporal.GetTestWorkflowID(tc.versionID, tc.taskQueue)
assert.Equal(t, tc.expectID, id, "unexpected workflow ID")
})
}
Expand Down
16 changes: 11 additions & 5 deletions internal/temporal/worker_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func GetWorkerDeploymentState(
if routingConfig.RampingVersion != "" {
var (
rampingSinceTime = metav1.NewTime(routingConfig.RampingVersionChangedTime)
lastRampUpdateTime = metav1.NewTime(workerDeploymentInfo.RoutingConfig.RampingVersionPercentageChangedTime)
lastRampUpdateTime = metav1.NewTime(routingConfig.RampingVersionPercentageChangedTime)
)
state.RampingSince = &rampingSinceTime
state.RampLastModifiedAt = &lastRampUpdateTime
Expand Down Expand Up @@ -130,6 +130,7 @@ func GetTestWorkflowStatus(
workerDeploymentName string,
versionID string,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
temporalState *TemporalWorkerState,
) ([]temporaliov1alpha1.WorkflowExecution, error) {
var results []temporaliov1alpha1.WorkflowExecution

Expand All @@ -154,8 +155,13 @@ func GetTestWorkflowStatus(
continue
}

// Adding task queue information to the current temporal state
temporalState.Versions[versionID].TaskQueues = append(temporalState.Versions[versionID].TaskQueues, temporaliov1alpha1.TaskQueue{
Name: tq.Name,
})

// Check if there is a test workflow for this task queue
testWorkflowID := getTestWorkflowID(workerDeploymentName, tq.Name, versionID)
testWorkflowID := GetTestWorkflowID(versionID, tq.Name)
wf, err := client.DescribeWorkflowExecution(
ctx,
testWorkflowID,
Expand Down Expand Up @@ -206,7 +212,7 @@ func mapWorkflowStatus(status enums.WorkflowExecutionStatus) temporaliov1alpha1.
}
}

// getTestWorkflowID generates a consistent ID for test workflows
func getTestWorkflowID(deploymentName, taskQueue, versionID string) string {
return fmt.Sprintf("test-%s-%s-%s", deploymentName, taskQueue, versionID)
// GetTestWorkflowID generates a workflowID for test workflows
func GetTestWorkflowID(versionID, taskQueue string) string {
return fmt.Sprintf("test-%s-%s", versionID, taskQueue)
}
6 changes: 3 additions & 3 deletions internal/temporal/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ func TestGetTestWorkflowID(t *testing.T) {
deploymentName: "worker",
taskQueue: "queue1",
versionID: "worker.v1",
expected: "test-worker-queue1-worker.v1",
expected: "test-worker.v1-queue1",
},
{
name: "with dots",
deploymentName: "worker.app",
taskQueue: "queue.main",
versionID: "worker.app.v2",
expected: "test-worker.app-queue.main-worker.app.v2",
expected: "test-worker.app.v2-queue.main",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
id := getTestWorkflowID(tt.deploymentName, tt.taskQueue, tt.versionID)
id := GetTestWorkflowID(tt.versionID, tt.taskQueue)
assert.Equal(t, tt.expected, id)
})
}
Expand Down