diff --git a/api/v1/cosmosfullnode_types.go b/api/v1/cosmosfullnode_types.go index d536390a..062c4313 100644 --- a/api/v1/cosmosfullnode_types.go +++ b/api/v1/cosmosfullnode_types.go @@ -138,21 +138,14 @@ type FullNodeStatus struct { // Current sync information. Collected every 60s. // +optional - SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"` + SyncInfo map[string]*SyncInfoPodStatus `json:"sync,omitempty"` // Latest Height information. collected when node starts up and when RPC is successfully queried. // +optional Height map[string]uint64 `json:"height,omitempty"` } -type SyncInfoStatus struct { - // The latest consensus state of pods. - Pods []SyncInfoPodStatus `json:"pods"` -} - type SyncInfoPodStatus struct { - // Pod's name. - Pod string `json:"pod"` // When consensus information was fetched. Timestamp metav1.Time `json:"timestamp"` // Latest height if no error encountered. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b528590a..b7e88e0a 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -320,8 +320,18 @@ func (in *FullNodeStatus) DeepCopyInto(out *FullNodeStatus) { } if in.SyncInfo != nil { in, out := &in.SyncInfo, &out.SyncInfo - *out = new(SyncInfoStatus) - (*in).DeepCopyInto(*out) + *out = make(map[string]*SyncInfoPodStatus, len(*in)) + for key, val := range *in { + var outVal *SyncInfoPodStatus + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(SyncInfoPodStatus) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } } if in.Height != nil { in, out := &in.Height, &out.Height @@ -762,25 +772,3 @@ func (in *SyncInfoPodStatus) DeepCopy() *SyncInfoPodStatus { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SyncInfoStatus) DeepCopyInto(out *SyncInfoStatus) { - *out = *in - if in.Pods != nil { - in, out := &in.Pods, &out.Pods - *out = make([]SyncInfoPodStatus, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SyncInfoStatus. -func (in *SyncInfoStatus) DeepCopy() *SyncInfoStatus { - if in == nil { - return nil - } - out := new(SyncInfoStatus) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml index 1b340d34..a76cf9d2 100644 --- a/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml +++ b/config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml @@ -6009,39 +6009,28 @@ spec: status: description: A generic message for the user. May contain errors. type: string - syncInfo: + sync: + additionalProperties: + properties: + error: + description: Error message if unable to fetch consensus state. + type: string + height: + description: Latest height if no error encountered. + format: int64 + type: integer + inSync: + description: If the pod reports itself as in sync with chain + tip. + type: boolean + timestamp: + description: When consensus information was fetched. + format: date-time + type: string + required: + - timestamp + type: object description: Current sync information. Collected every 60s. - properties: - pods: - description: The latest consensus state of pods. - items: - properties: - error: - description: Error message if unable to fetch consensus - state. - type: string - height: - description: Latest height if no error encountered. - format: int64 - type: integer - inSync: - description: If the pod reports itself as in sync with chain - tip. - type: boolean - pod: - description: Pod's name. - type: string - timestamp: - description: When consensus information was fetched. - format: date-time - type: string - required: - - pod - - timestamp - type: object - type: array - required: - - pods type: object required: - observedGeneration diff --git a/controllers/cosmosfullnode_controller.go b/controllers/cosmosfullnode_controller.go index 221cecfe..f60d370a 100644 --- a/controllers/cosmosfullnode_controller.go +++ b/controllers/cosmosfullnode_controller.go @@ -118,7 +118,10 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque reporter := kube.NewEventReporter(logger, r.recorder, crd) fullnode.ResetStatus(crd) - defer r.updateStatus(ctx, crd) + + syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) + + defer r.updateStatus(ctx, crd, syncInfo) errs := &kube.ReconcileErrors{} @@ -169,7 +172,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Reconcile pods. - podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums) + podRequeue, err := r.podControl.Reconcile(ctx, reporter, crd, configCksums, syncInfo) if err != nil { errs.Append(err) } @@ -218,21 +221,19 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e return stopResult, err } -func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) { - consensus := fullnode.SyncInfoStatus(ctx, crd, r.cacheController) - +func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) { if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) { status.ObservedGeneration = crd.Status.ObservedGeneration status.Phase = crd.Status.Phase status.StatusMessage = crd.Status.StatusMessage status.Peers = crd.Status.Peers - status.SyncInfo = &consensus - for _, v := range consensus.Pods { + status.SyncInfo = syncInfo + for k, v := range syncInfo { if v.Height != nil && *v.Height > 0 { if status.Height == nil { status.Height = make(map[string]uint64) } - status.Height[v.Pod] = *v.Height + status.Height[k] = *v.Height } } }); err != nil { diff --git a/internal/cosmos/cache_controller.go b/internal/cosmos/cache_controller.go index f3577dd7..9db49969 100644 --- a/internal/cosmos/cache_controller.go +++ b/internal/cosmos/cache_controller.go @@ -149,6 +149,22 @@ func (c *CacheController) Reconcile(ctx context.Context, req reconcile.Request) return finishResult, nil } +// Invalidate removes the given pods status from the cache. +func (c *CacheController) Invalidate(controller client.ObjectKey, pods []string) { + v, _ := c.cache.Get(controller) + now := time.Now() + for _, s := range v { + for _, pod := range pods { + if s.Pod.Name == pod { + s.Status = CometStatus{} + s.Err = fmt.Errorf("invalidated") + s.TS = now + } + } + } + c.cache.Update(controller, v) +} + // Collect returns a StatusCollection for the given controller. Only returns cached CometStatus. func (c *CacheController) Collect(ctx context.Context, controller client.ObjectKey) StatusCollection { pods, err := c.listPods(ctx, controller) @@ -168,11 +184,6 @@ func (c *CacheController) SyncedPods(ctx context.Context, controller client.Obje return kube.AvailablePods(c.Collect(ctx, controller).SyncedPods(), 5*time.Second, time.Now()) } -// PodsWithStatus returns all pods with their status. -func (c *CacheController) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []PodStatus { - return c.Collect(ctx, client.ObjectKeyFromObject(crd)).PodsWithStatus(crd) -} - func (c *CacheController) listPods(ctx context.Context, controller client.ObjectKey) ([]corev1.Pod, error) { var pods corev1.PodList if err := c.client.List(ctx, &pods, diff --git a/internal/cosmos/status_collection.go b/internal/cosmos/status_collection.go index 672df04b..aefa018f 100644 --- a/internal/cosmos/status_collection.go +++ b/internal/cosmos/status_collection.go @@ -6,7 +6,6 @@ import ( "time" "github.com/samber/lo" - cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -114,46 +113,3 @@ func (coll StatusCollection) Synced() StatusCollection { func (coll StatusCollection) SyncedPods() []*corev1.Pod { return lo.Map(coll.Synced(), func(status StatusItem, _ int) *corev1.Pod { return status.GetPod() }) } - -// PodStatus is the status of a pod. -type PodStatus struct { - Pod *corev1.Pod - RPCReachable bool - Synced bool - AwaitingUpgrade bool -} - -// PodsWithStatus returns all pods with their status. -func (coll StatusCollection) PodsWithStatus(crd *cosmosv1.CosmosFullNode) []PodStatus { - out := make([]PodStatus, len(coll)) - for i, status := range coll { - ps := PodStatus{ - Pod: status.GetPod(), - } - if crd.Spec.ChainSpec.Versions != nil { - instanceHeight := uint64(0) - if height, ok := crd.Status.Height[status.Pod.Name]; ok { - instanceHeight = height - } - var image string - for _, version := range crd.Spec.ChainSpec.Versions { - if instanceHeight < version.UpgradeHeight { - break - } - image = version.Image - } - if image != "" && status.Pod.Spec.Containers[0].Image != image { - ps.AwaitingUpgrade = true - } - } - if status.Err == nil { - ps.RPCReachable = true - if !status.Status.Result.SyncInfo.CatchingUp { - ps.Synced = true - } - } - - out[i] = ps - } - return out -} diff --git a/internal/fullnode/pod_control.go b/internal/fullnode/pod_control.go index 789b2208..8fc8a0d0 100644 --- a/internal/fullnode/pod_control.go +++ b/internal/fullnode/pod_control.go @@ -5,7 +5,6 @@ import ( "fmt" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" - "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" @@ -16,10 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type PodFilter interface { - PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus -} - // Client is a controller client. It is a subset of client.Client. type Client interface { client.Reader @@ -28,25 +23,35 @@ type Client interface { Scheme() *runtime.Scheme } +type CacheInvalidator interface { + Invalidate(controller client.ObjectKey, pods []string) +} + // PodControl reconciles pods for a CosmosFullNode. type PodControl struct { - client Client - podFilter PodFilter - computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int + client Client + cacheInvalidator CacheInvalidator + computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int } // NewPodControl returns a valid PodControl. -func NewPodControl(client Client, filter PodFilter) PodControl { +func NewPodControl(client Client, cacheInvalidator CacheInvalidator) PodControl { return PodControl{ - client: client, - podFilter: filter, - computeRollout: kube.ComputeRollout, + client: client, + cacheInvalidator: cacheInvalidator, + computeRollout: kube.ComputeRollout, } } // Reconcile is the control loop for pods. The bool return value, if true, indicates the controller should requeue // the request. -func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd *cosmosv1.CosmosFullNode, cksums ConfigChecksums) (bool, kube.ReconcileError) { +func (pc PodControl) Reconcile( + ctx context.Context, + reporter kube.Reporter, + crd *cosmosv1.CosmosFullNode, + cksums ConfigChecksums, + syncInfo map[string]*cosmosv1.SyncInfoPodStatus, +) (bool, kube.ReconcileError) { var pods corev1.PodList if err := pc.client.List(ctx, &pods, client.InNamespace(crd.Namespace), @@ -62,7 +67,7 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd diffed := diff.New(ptrSlice(pods.Items), wantPods) for _, pod := range diffed.Creates() { - reporter.Info("Creating pod", "pod", pod.Name) + reporter.Info("Creating pod", "name", pod.Name) if err := ctrl.SetControllerReference(crd, pod, pc.client.Scheme()); err != nil { return true, kube.TransientError(fmt.Errorf("set controller reference on pod %q: %w", pod.Name, err)) } @@ -71,11 +76,24 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd } } + var invalidateCache []string + + defer func() { + if pc.cacheInvalidator == nil { + return + } + if len(invalidateCache) > 0 { + pc.cacheInvalidator.Invalidate(client.ObjectKeyFromObject(crd), invalidateCache) + } + }() + for _, pod := range diffed.Deletes() { - reporter.Info("Deleting pod", "pod", pod.Name) + reporter.Info("Deleting pod", "name", pod.Name) if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); kube.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("delete pod %q: %w", pod.Name, err)) } + delete(syncInfo, pod.Name) + invalidateCache = append(invalidateCache, pod.Name) } if len(diffed.Creates())+len(diffed.Deletes()) > 0 { @@ -86,37 +104,49 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd diffedUpdates := diffed.Updates() if len(diffedUpdates) > 0 { var ( - podsWithStatus = pc.podFilter.PodsWithStatus(ctx, crd) - upgradePods = make(map[string]bool) - otherUpdates = make(map[string]*corev1.Pod) - rpcReachablePods = make(map[string]bool) - inSyncPods = make(map[string]bool) - deletedPods = make(map[string]bool) + updatedPods = 0 + rpcReachablePods = 0 + inSyncPods = 0 + otherUpdates = []*corev1.Pod{} ) - for _, ps := range podsWithStatus { - if ps.Synced { - inSyncPods[ps.Pod.Name] = true + for _, existing := range pods.Items { + podName := existing.Name + + if existing.DeletionTimestamp != nil { + // Pod is being deleted, so we skip it. + continue } - if ps.RPCReachable { - rpcReachablePods[ps.Pod.Name] = true + + var rpcReachable bool + if ps, ok := syncInfo[podName]; ok { + if ps.InSync != nil && *ps.InSync { + inSyncPods++ + } + rpcReachable = ps.Error == nil + if rpcReachable { + rpcReachablePods++ + } } for _, update := range diffedUpdates { - if ps.Pod.Name == update.Name { - if ps.AwaitingUpgrade { - if !ps.RPCReachable { - upgradePods[ps.Pod.Name] = true - reporter.Info("Deleting pod for version upgrade", "podName", ps.Pod.Name) + if podName == update.Name { + if existing.Spec.Containers[0].Image != update.Spec.Containers[0].Image { + // awaiting upgrade + if !rpcReachable { + updatedPods++ + reporter.Info("Deleting pod for version upgrade", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. - if err := pc.client.Delete(ctx, ps.Pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { - return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", ps.Pod.Name, err)) + if err := pc.client.Delete(ctx, update, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { + return true, kube.TransientError(fmt.Errorf("upgrade pod version %q: %w", podName, err)) } - deletedPods[ps.Pod.Name] = true + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("version upgrade in progress") + invalidateCache = append(invalidateCache, podName) } else { - otherUpdates[ps.Pod.Name] = ps.Pod + otherUpdates = append(otherUpdates, update) } } else { - otherUpdates[ps.Pod.Name] = ps.Pod + otherUpdates = append(otherUpdates, update) } break } @@ -125,40 +155,41 @@ func (pc PodControl) Reconcile(ctx context.Context, reporter kube.Reporter, crd // If we don't have any pods in sync, we are down anyways, so we can use the number of RPC reachable pods for computing the rollout, // with the goal of recovering the pods as quickly as possible. - ready := len(inSyncPods) + ready := inSyncPods if ready == 0 { - ready = len(rpcReachablePods) + ready = rpcReachablePods } numUpdates := pc.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), ready) - updated := len(upgradePods) - - if updated == len(diffedUpdates) { + if updatedPods == len(diffedUpdates) { // All pods are updated. return false, nil } - if updated >= numUpdates { + if updatedPods >= numUpdates { // Signal requeue. return true, nil } - for podName, pod := range otherUpdates { - reporter.Info("Deleting pod for update", "podName", podName) + for _, pod := range otherUpdates { + podName := pod.Name + reporter.Info("Deleting pod for update", "name", podName) // Because we should watch for deletes, we get a re-queued request, detect pod is missing, and re-create it. if err := pc.client.Delete(ctx, pod, client.PropagationPolicy(metav1.DeletePropagationForeground)); client.IgnoreNotFound(err) != nil { return true, kube.TransientError(fmt.Errorf("update pod %q: %w", podName, err)) } - deletedPods[podName] = true - updated++ - if updated >= numUpdates { + syncInfo[podName].InSync = nil + syncInfo[podName].Error = ptr("update in progress") + invalidateCache = append(invalidateCache, podName) + updatedPods++ + if updatedPods >= numUpdates { // done for this round break } } - if len(diffedUpdates) == updated { + if len(diffedUpdates) == updatedPods { // All pods are updated. return false, nil } diff --git a/internal/fullnode/pod_control_test.go b/internal/fullnode/pod_control_test.go index 41480a69..023857c2 100644 --- a/internal/fullnode/pod_control_test.go +++ b/internal/fullnode/pod_control_test.go @@ -5,9 +5,7 @@ import ( "fmt" "testing" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" - "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" "github.com/strangelove-ventures/cosmos-operator/internal/diff" "github.com/strangelove-ventures/cosmos-operator/internal/kube" "github.com/stretchr/testify/require" @@ -17,24 +15,51 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -type mockPodFilter func(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus +type mockPodClient struct{ mockClient[*corev1.Pod] } -func (fn mockPodFilter) PodsWithStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - if ctx == nil { - panic("nil context") +func newMockPodClient(pods []*corev1.Pod) *mockPodClient { + return &mockPodClient{ + mockClient: mockClient[*corev1.Pod]{ + ObjectList: corev1.PodList{ + Items: valueSlice(pods), + }, + }, + } +} + +func (c *mockPodClient) setPods(pods []*corev1.Pod) { + c.ObjectList = corev1.PodList{ + Items: valueSlice(pods), + } +} + +func (c *mockPodClient) upgradePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, newPodWithNewImage, true) } - return fn(ctx, crd) + c.setPods(existing) } -var panicPodFilter = mockPodFilter(func(context.Context, *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - panic("SyncedPods should not be called") -}) +func (c *mockPodClient) deletePods( + t *testing.T, + crdName string, + ordinals ...int, +) { + existing := ptrSlice(c.ObjectList.(corev1.PodList).Items) + for _, ordinal := range ordinals { + updatePod(t, crdName, ordinal, existing, deletedPod, false) + } + c.setPods(existing) +} func TestPodControl_Reconcile(t *testing.T) { t.Parallel() - type mockPodClient = mockClient[*corev1.Pod] - ctx := context.Background() const namespace = "test" @@ -46,15 +71,18 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates()[0] + existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{*existing}, + require.Len(t, existing, 1) + + mClient := newMockPodClient(existing) + + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, } - control := NewPodControl(&mClient, panicPodFilter) - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + control := NewPodControl(mClient, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.False(t, requeue) @@ -74,16 +102,13 @@ func TestPodControl_Reconcile(t *testing.T) { crd.Namespace = namespace crd.Spec.Replicas = 3 - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, - }, - } + mClient := newMockPodClient([]*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "hub-98"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "hub-99"}}, + }) - control := NewPodControl(&mClient, panicPodFilter) - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + control := NewPodControl(mClient, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, nil) require.NoError(t, err) require.True(t, requeue) @@ -107,73 +132,68 @@ func TestPodControl_Reconcile(t *testing.T) { pods, err := BuildPods(&crd, nil) require.NoError(t, err) - existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient := newMockPodClient(diff.New(nil, pods).Creates()) - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - }) - }) + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": {InSync: ptr(true)}, + "hub-1": {InSync: ptr(true)}, + "hub-2": {InSync: ptr(true)}, + "hub-3": {InSync: ptr(true)}, + "hub-4": {InSync: ptr(true)}, + } - control := NewPodControl(&mClient, podFilter) - const stubRollout = 5 + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) - require.Equal(t, stubRollout, ready) // mockPodFilter only returns 1 candidate as ready + require.Equal(t, 5, ready) // mockPodFilter only returns 1 candidate as ready return kube.ComputeRollout(maxUnavail, desired, ready) } // Trigger updates crd.Spec.PodTemplate.Image = "new-image" - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) - require.Equal(t, 2, mClient.DeleteCount) - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i < 2 { - ps.RPCReachable = false - ps.Synced = false - } - return ps - }) - }) + mClient.deletePods(t, crd.Name, 0, 1) - control = NewPodControl(&mClient, podFilter) + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 are in the deleting state. + return kube.ComputeRollout(maxUnavail, desired, ready) + } - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) + require.True(t, requeue) - require.True(t, didFilter) + // pod status has not changed, but 0 and 1 are now in deleting state. + // should not delete any more. + require.Equal(t, 2, mClient.DeleteCount) + + // once pod deletion is complete, new pods are created with new image. + mClient.upgradePods(t, crd.Name, 0, 1) + + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") + + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) + require.True(t, requeue) require.Zero(t, mClient.CreateCount) @@ -206,28 +226,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient := newMockPodClient(existing) - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - // pods are at or above upgrade height and not reachable - AwaitingUpgrade: true, - RPCReachable: true, - Synced: false, - } - }) - }) + // pods are at upgrade height and reachable + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-1": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-2": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-3": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + "hub-4": { + Height: ptr(uint64(100)), + InSync: ptr(true), + }, + } - control := NewPodControl(&mClient, podFilter) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -242,145 +267,126 @@ func TestPodControl_Reconcile(t *testing.T) { // Reconcile 1, should update 0 and 1 - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // only handled 2 updates, so should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) require.Equal(t, 2, mClient.DeleteCount) - existing[0].Spec.Containers[0].Image = "new-image" - existing[1].Spec.Containers[0].Image = "new-image" + mClient.deletePods(t, crd.Name, 0, 1) - recalculatePodRevision(existing[0], 0) - recalculatePodRevision(existing[1], 1) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 are in the deleting state. + return kube.ComputeRollout(maxUnavail, desired, ready) } - // 2 are now unavailable, working on upgrade - - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i < 2 { - ps.RPCReachable = false - ps.Synced = false - } else { - ps.AwaitingUpgrade = true - } - return ps - }) - }) + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) + + require.True(t, requeue) + + // pod status has not changed, but 0 and 1 are now in deleting state. + // should not delete any more. + require.Equal(t, 2, mClient.DeleteCount) + + mClient.upgradePods(t, crd.Name, 0, 1) - control = NewPodControl(&mClient, podFilter) + // 0 and 1 are now unavailable, working on upgrade + syncInfo["hub-0"].InSync = nil + syncInfo["hub-0"].Error = ptr("upgrade in progress") + + syncInfo["hub-1"].InSync = nil + syncInfo["hub-1"].Error = ptr("upgrade in progress") // Reconcile 2, should not update anything because 0 and 1 are still in progress. - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // no further updates yet, should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should not delete any more yet. require.Equal(t, 2, mClient.DeleteCount) // mock out that one of the pods completed the upgrade. should begin upgrading one more + syncInfo["hub-0"].InSync = ptr(true) + syncInfo["hub-0"].Height = ptr(uint64(101)) + syncInfo["hub-0"].Error = nil - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i == 1 { - ps.RPCReachable = false - ps.Synced = false - } - if i >= 2 { - ps.AwaitingUpgrade = true - } - return ps - }) - }) - - control = NewPodControl(&mClient, podFilter) + // Reconcile 3, should update pod 2 (only one) because 1 is still in progress, but 0 is done. - // Reconcile 3, should update 2 (only one) because 1 is still in progress, but 0 is done. + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 4, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // only handled 1 updates, so should requeue. require.True(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should delete one more require.Equal(t, 3, mClient.DeleteCount) - existing[2].Spec.Containers[0].Image = "new-image" - recalculatePodRevision(existing[2], 2) - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), + mClient.deletePods(t, crd.Name, 2) + + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 3, ready) // only 3 should be marked ready because 2 is in the deleting state and 1 is still in progress upgrading. + return kube.ComputeRollout(maxUnavail, desired, ready) } - // mock out that both pods completed the upgrade. should begin upgrading the last 2 + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) + require.NoError(t, err) - didFilter = false - podFilter = mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - ps := cosmos.PodStatus{ - Pod: pod, - RPCReachable: true, - Synced: true, - } - if i >= 3 { - ps.AwaitingUpgrade = true - } - return ps - }) - }) + require.True(t, requeue) + + // pod status has not changed, but 2 is now in deleting state. + // should not delete any more. + require.Equal(t, 3, mClient.DeleteCount) - control = NewPodControl(&mClient, podFilter) + mClient.upgradePods(t, crd.Name, 2) + + // mock out that both pods completed the upgrade. should begin upgrading the last 2 + syncInfo["hub-1"].InSync = ptr(true) + syncInfo["hub-1"].Height = ptr(uint64(101)) + syncInfo["hub-1"].Error = nil + + syncInfo["hub-2"].InSync = ptr(true) + syncInfo["hub-2"].Height = ptr(uint64(101)) + syncInfo["hub-2"].Error = nil // Reconcile 4, should update 3 and 4 because the rest are done. - requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil) + control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { + require.EqualValues(t, crd.Spec.Replicas, desired) + require.Equal(t, 5, ready) + return kube.ComputeRollout(maxUnavail, desired, ready) + } + + requeue, err = control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // all updates are now handled, no longer need requeue. require.False(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) // should delete the last 2 @@ -413,28 +419,33 @@ func TestPodControl_Reconcile(t *testing.T) { require.NoError(t, err) existing := diff.New(nil, pods).Creates() - var mClient mockPodClient - mClient.ObjectList = corev1.PodList{ - Items: valueSlice(existing), - } + mClient := newMockPodClient(existing) - var didFilter bool - podFilter := mockPodFilter(func(_ context.Context, crd *cosmosv1.CosmosFullNode) []cosmos.PodStatus { - require.Equal(t, namespace, crd.Namespace) - require.Equal(t, "hub", crd.Name) - didFilter = true - return lo.Map(existing, func(pod *corev1.Pod, i int) cosmos.PodStatus { - return cosmos.PodStatus{ - Pod: pod, - // pods are at or above upgrade height and not reachable - AwaitingUpgrade: true, - RPCReachable: false, - Synced: false, - } - }) - }) + // pods are at upgrade height and reachable + syncInfo := map[string]*cosmosv1.SyncInfoPodStatus{ + "hub-0": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-1": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-2": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-3": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + "hub-4": { + Height: ptr(uint64(100)), + Error: ptr("panic at upgrade height"), + }, + } - control := NewPodControl(&mClient, podFilter) + control := NewPodControl(mClient, nil) control.computeRollout = func(maxUnavail *intstr.IntOrString, desired, ready int) int { require.EqualValues(t, crd.Spec.Replicas, desired) @@ -447,14 +458,12 @@ func TestPodControl_Reconcile(t *testing.T) { crd.Status.Height[pod.Name] = 100 } - requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil) + requeue, err := control.Reconcile(ctx, nopReporter, &crd, nil, syncInfo) require.NoError(t, err) // all updates are handled, so should not requeue require.False(t, requeue) - require.True(t, didFilter) - require.Zero(t, mClient.CreateCount) require.Equal(t, 5, mClient.DeleteCount) }) @@ -468,3 +477,27 @@ func recalculatePodRevision(pod *corev1.Pod, ordinal int) { pod.Labels["app.kubernetes.io/revision"] = rev1 pod.Annotations["app.kubernetes.io/ordinal"] = fmt.Sprintf("%d", ordinal) } + +func newPodWithNewImage(pod *corev1.Pod) { + pod.DeletionTimestamp = nil + pod.Spec.Containers[0].Image = "new-image" +} + +func deletedPod(pod *corev1.Pod) { + pod.DeletionTimestamp = ptr(metav1.Now()) +} + +func updatePod(t *testing.T, crdName string, ordinal int, pods []*corev1.Pod, updateFn func(pod *corev1.Pod), recalc bool) { + podName := fmt.Sprintf("%s-%d", crdName, ordinal) + for _, pod := range pods { + if pod.Name == podName { + updateFn(pod) + if recalc { + recalculatePodRevision(pod, ordinal) + } + return + } + } + + require.FailNow(t, "pod not found", podName) +} diff --git a/internal/fullnode/status.go b/internal/fullnode/status.go index 02842c05..4ca46360 100644 --- a/internal/fullnode/status.go +++ b/internal/fullnode/status.go @@ -3,7 +3,6 @@ package fullnode import ( "context" - "github.com/samber/lo" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" "github.com/strangelove-ventures/cosmos-operator/internal/cosmos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,24 +26,25 @@ func SyncInfoStatus( ctx context.Context, crd *cosmosv1.CosmosFullNode, collector StatusCollector, -) cosmosv1.SyncInfoStatus { - var status cosmosv1.SyncInfoStatus +) map[string]*cosmosv1.SyncInfoPodStatus { + status := make(map[string]*cosmosv1.SyncInfoPodStatus, crd.Spec.Replicas) coll := collector.Collect(ctx, client.ObjectKeyFromObject(crd)) - status.Pods = lo.Map(coll, func(item cosmos.StatusItem, _ int) cosmosv1.SyncInfoPodStatus { + for _, item := range coll { var stat cosmosv1.SyncInfoPodStatus - stat.Pod = item.GetPod().Name + podName := item.GetPod().Name stat.Timestamp = metav1.NewTime(item.Timestamp()) comet, err := item.GetStatus() if err != nil { stat.Error = ptr(err.Error()) - return stat + status[podName] = &stat + continue } stat.Height = ptr(comet.LatestBlockHeight()) stat.InSync = ptr(!comet.Result.SyncInfo.CatchingUp) - return stat - }) + status[podName] = &stat + } return status } diff --git a/internal/fullnode/status_test.go b/internal/fullnode/status_test.go index b3763169..03a52ee0 100644 --- a/internal/fullnode/status_test.go +++ b/internal/fullnode/status_test.go @@ -72,24 +72,20 @@ func TestSyncInfoStatus(t *testing.T) { } wantTS := metav1.NewTime(ts) - want := cosmosv1.SyncInfoStatus{ - Pods: []cosmosv1.SyncInfoPodStatus{ - { - Pod: "pod-0", - Timestamp: wantTS, - Height: ptr(uint64(9999)), - InSync: ptr(false), - }, - {Pod: "pod-1", - Timestamp: wantTS, - Height: ptr(uint64(10000)), - InSync: ptr(true), - }, - { - Pod: "pod-2", - Timestamp: wantTS, - Error: ptr("some error"), - }, + want := map[string]*cosmosv1.SyncInfoPodStatus{ + "pod-0": { + Timestamp: wantTS, + Height: ptr(uint64(9999)), + InSync: ptr(false), + }, + "pod-1": { + Timestamp: wantTS, + Height: ptr(uint64(10000)), + InSync: ptr(true), + }, + "pod-2": { + Timestamp: wantTS, + Error: ptr("some error"), }, }