Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,15 @@ type TemporalWorkerDeploymentStatus struct {
// TargetVersion is the desired next version. If TargetVersion.Deployment is nil,
// then the controller should create it. If not nil, the controller should
// wait for it to become healthy and then move it to the CurrentVersion.
// This must never be nil.
// +kubebuilder:validation:Required
TargetVersion *TargetWorkerDeploymentVersion `json:"targetVersion"`

// CurrentVersion is the version that is currently registered with
// Temporal as the current version of its worker deployment. This will be nil
// during initial bootstrap until a version is registered and set as current.
CurrentVersion *CurrentWorkerDeploymentVersion `json:"currentVersion,omitempty"`

// RampingVersion is the version that is currently registered with
// Temporal as the ramping version of its worker deployment. The controller
// should ensure that this is always equal to the TargetVersion, or, if the
// TargetVersion has been promoted to the current version, this should be nil.
RampingVersion *TargetWorkerDeploymentVersion `json:"rampingVersion,omitempty"`

// DeprecatedVersions are deployment versions that are no longer the default. Any
// deployment versions that are unreachable should be deleted by the controller.
DeprecatedVersions []*DeprecatedWorkerDeploymentVersion `json:"deprecatedVersions,omitempty"`
Expand Down
11 changes: 5 additions & 6 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3854,74 +3854,6 @@ spec:
type: array
lastModifierIdentity:
type: string
rampingVersion:
properties:
deployment:
properties:
apiVersion:
type: string
fieldPath:
type: string
kind:
type: string
name:
type: string
namespace:
type: string
resourceVersion:
type: string
uid:
type: string
type: object
x-kubernetes-map-type: atomic
healthySince:
format: date-time
type: string
managedBy:
type: string
rampLastModifiedAt:
format: date-time
type: string
rampPercentage:
type: number
rampingSince:
format: date-time
type: string
status:
type: string
taskQueues:
items:
properties:
name:
type: string
required:
- name
type: object
type: array
testWorkflows:
items:
properties:
runID:
type: string
status:
type: string
taskQueue:
type: string
workflowID:
type: string
required:
- runID
- status
- taskQueue
- workflowID
type: object
type: array
versionID:
type: string
required:
- status
- versionID
type: object
targetVersion:
properties:
deployment:
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/planner"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
)

// plan holds the actions to execute during reconciliation
Expand Down Expand Up @@ -50,6 +51,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
l logr.Logger,
w *temporaliov1alpha1.TemporalWorkerDeployment,
connection temporaliov1alpha1.TemporalConnectionSpec,
temporalState *temporal.TemporalWorkerState,
) (*plan, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(w)
targetVersionID := k8s.ComputeVersionID(w)
Expand Down Expand Up @@ -82,17 +84,15 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(

// Generate the plan using the planner package
plannerConfig := &planner.Config{
Status: &w.Status,
Spec: &w.Spec,
RolloutStrategy: rolloutStrategy,
TargetVersionID: targetVersionID,
Replicas: *w.Spec.Replicas,
ConflictToken: w.Status.VersionConflictToken,
}

planResult, err := planner.GeneratePlan(
l,
k8sState,
&w.Status,
&w.Spec,
temporalState,
plannerConfig,
)
if err != nil {
Expand Down
17 changes: 6 additions & 11 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
temporalClient temporalClient.Client,
req ctrl.Request,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
temporalState *temporal.TemporalWorkerState,
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
targetVersionID := k8s.ComputeVersionID(workerDeploy)
Expand All @@ -41,17 +42,6 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
}

// Fetch Temporal worker deployment state
temporalState, err := temporal.GetWorkerDeploymentState(
ctx,
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
)
if err != nil {
return nil, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
}

// Fetch test workflow status for the desired version
if targetVersionID != temporalState.CurrentVersionID {
testWorkflows, err := temporal.GetTestWorkflowStatus(
Expand All @@ -76,5 +66,10 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
stateMapper := newStateMapper(k8sState, temporalState)
status := stateMapper.mapToStatus(targetVersionID)

// Validate that TargetVersion is never nil (defensive programming)
if status.TargetVersion == nil {
return nil, fmt.Errorf("generated status has nil TargetVersion for target version %s, this should never happen", targetVersionID)
}

return status, nil
}
19 changes: 3 additions & 16 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,25 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork

// Set current version
currentVersionID := m.temporalState.CurrentVersionID
if currentVersionID != "" {
status.CurrentVersion = m.mapCurrentWorkerDeploymentVersion(currentVersionID)
}
status.CurrentVersion = m.mapCurrentWorkerDeploymentVersion(currentVersionID)

// Set target version (desired version)
status.TargetVersion = m.mapTargetWorkerDeploymentVersion(targetVersionID)
if status.TargetVersion != nil && m.temporalState.RampingVersionID == targetVersionID {
if m.temporalState.RampingVersionID == targetVersionID {
status.TargetVersion.RampingSince = m.temporalState.RampingSince
status.TargetVersion.RampLastModifiedAt = m.temporalState.RampLastModifiedAt
rampPercentage := m.temporalState.RampPercentage
status.TargetVersion.RampPercentage = &rampPercentage
}

rampingVersionID := m.temporalState.RampingVersionID
// Set ramping version if it exists
if rampingVersionID != "" {
status.RampingVersion = m.mapTargetWorkerDeploymentVersion(rampingVersionID)
}

// Add deprecated versions
var deprecatedVersions []*v1alpha1.DeprecatedWorkerDeploymentVersion
for versionID := range m.k8sState.Deployments {
// Skip current and target versions
if versionID == currentVersionID || versionID == targetVersionID || versionID == rampingVersionID {
if versionID == currentVersionID || versionID == targetVersionID {
continue
}

// TODO(rob): We should never see a version here that has VersionStatusCurrent, but should we check?
versionStatus := m.mapDeprecatedWorkerDeploymentVersion(versionID)
if versionStatus != nil {
deprecatedVersions = append(deprecatedVersions, versionStatus)
Expand Down Expand Up @@ -108,10 +99,6 @@ func (m *stateMapper) mapCurrentWorkerDeploymentVersion(versionID string) *v1alp

// mapTargetWorkerDeploymentVersion creates a target version status from the states
func (m *stateMapper) mapTargetWorkerDeploymentVersion(versionID string) *v1alpha1.TargetWorkerDeploymentVersion {
if versionID == "" {
return nil
}

version := &v1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: v1alpha1.BaseWorkerDeploymentVersion{
VersionID: versionID,
Expand Down
18 changes: 16 additions & 2 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
)

var (
Expand Down Expand Up @@ -124,8 +126,20 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
temporalClient = c
}

// Fetch Temporal worker deployment state
workerDeploymentName := k8s.ComputeWorkerDeploymentName(&workerDeploy)
temporalState, err := temporal.GetWorkerDeploymentState(
ctx,
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
}

// Compute a new status from k8s and temporal state
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy)
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -151,7 +165,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
}

// Generate a plan to get to desired spec from current status
plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec)
plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec, temporalState)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Loading