Skip to content

Commit 84fbb5c

Browse files
authored
Implement changes for Multi-Cluster coordination between multiple operator instances. (#2259)
* Implement changes for Multi-Cluster coordination between multiple operator instances.
1 parent 23bf338 commit 84fbb5c

File tree

88 files changed

+4105
-578
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+4105
-578
lines changed

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ fmt: bin/fmt_check
162162
# TODO johscheuer: enable those new command in a new PR.
163163
bin/fmt_check: ${GO_ALL}
164164
# $(GO_LINES) -w .
165-
go fmt $$(go list ./...)
166-
# $(GO_IMPORTS) -w .
165+
@go fmt $$(go list ./...)
166+
#@$(GO_IMPORTS) -w $(GO_SRC)
167167
#$(GOLANGCI_LINT) run --fix
168168
@mkdir -p bin
169169
@touch $@

api/v1beta2/foundationdb_status_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ package v1beta2
2222

2323
import (
2424
"encoding/json"
25-
"k8s.io/utils/pointer"
2625
"net"
2726
"os"
2827
"path/filepath"
2928

29+
"k8s.io/utils/pointer"
30+
3031
. "github.com/onsi/ginkgo/v2"
3132
. "github.com/onsi/gomega"
3233
)

api/v1beta2/foundationdbcluster_types.go

+43
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,19 @@ type FoundationDBClusterAutomationOptions struct {
12101210
// The default is a list that includes "fdb-kubernetes-operator".
12111211
// +kubebuilder:validation:MaxItems=10
12121212
IgnoreLogGroupsForUpgrade []LogGroup `json:"ignoreLogGroupsForUpgrade,omitempty"`
1213+
1214+
// SynchronizationMode defines the synchronization mode for clusters that are managed by multiple operator instances.
1215+
// The default is "local" which means all operator instances are only acting on their local processes, with the exception for
1216+
// cluster upgrades. In the "global" mode the operator instances coordinate actions to only issue a single
1217+
// exclude/bounce/include to reduce the disruptions. The global coordination mode is based on an optimistic mode
1218+
// and there are no guarantees that the action will only be executed once, e.g. because of a slow operator instance.
1219+
//
1220+
// More details:
1221+
// https://github.com/FoundationDB/fdb-kubernetes-operator/blob/main/docs/design/better_coordination_multi_operator.md
1222+
// +kubebuilder:validation:Optional
1223+
// +kubebuilder:validation:Enum=local;global
1224+
// +kubebuilder:default:=local
1225+
SynchronizationMode *string `json:"synchronizationMode,omitempty"`
12131226
}
12141227

12151228
// LogGroup represents a LogGroup used by a FoundationDB process to log trace events. The LogGroup can be used to filter
@@ -3144,3 +3157,33 @@ func (cluster *FoundationDBCluster) GetMaxFaultDomainsWithTaintedProcessGroups(f
31443157

31453158
return maxAllowed, nil
31463159
}
3160+
3161+
// UpdateAction defines the update action for an entry in the multi-region coordination key-space.
3162+
type UpdateAction string
3163+
3164+
const (
3165+
// UpdateActionAdd will add or update the provided information.
3166+
UpdateActionAdd UpdateAction = "add"
3167+
// UpdateActionDelete will remove the provided associated information.
3168+
UpdateActionDelete UpdateAction = "delete"
3169+
)
3170+
3171+
// SynchronizationMode defines the synchronization mode.
3172+
// +kubebuilder:validation:MaxLength=256
3173+
type SynchronizationMode string
3174+
3175+
const (
3176+
// SynchronizationModeLocal defines the local synchronization mode.
3177+
SynchronizationModeLocal SynchronizationMode = "local"
3178+
// SynchronizationModeGlobal SynchronizationMode the global synchronization mode.
3179+
SynchronizationModeGlobal SynchronizationMode = "global"
3180+
)
3181+
3182+
// GetSynchronizationMode returns the current SynchronizationMode.
3183+
func (cluster *FoundationDBCluster) GetSynchronizationMode() SynchronizationMode {
3184+
if !cluster.Status.Configured {
3185+
return SynchronizationModeLocal
3186+
}
3187+
3188+
return SynchronizationMode(pointer.StringDeref(cluster.Spec.AutomationOptions.SynchronizationMode, string(SynchronizationModeLocal)))
3189+
}

api/v1beta2/zz_generated.deepcopy.go

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/apps.foundationdb.org_foundationdbclusters.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ spec:
154154
taintReplacementTimeSeconds:
155155
type: integer
156156
type: object
157+
synchronizationMode:
158+
default: local
159+
enum:
160+
- local
161+
- global
162+
type: string
157163
useLocalitiesForExclusion:
158164
type: boolean
159165
useManagementAPI:

controllers/add_pods_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ package controllers
2222

2323
import (
2424
"context"
25+
"sort"
26+
2527
fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2"
2628
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"
2729
. "github.com/onsi/ginkgo/v2"
2830
. "github.com/onsi/gomega"
2931
corev1 "k8s.io/api/core/v1"
30-
"sort"
3132
)
3233

3334
var _ = Describe("add_pods", func() {

controllers/add_process_groups.go

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package controllers
2323
import (
2424
"context"
2525
"fmt"
26+
2627
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
2728
"github.com/go-logr/logr"
2829

controllers/add_services.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package controllers
2222

2323
import (
2424
"context"
25+
2526
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"
2627
"github.com/go-logr/logr"
2728

controllers/backup_controller_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package controllers
2222

2323
import (
2424
"fmt"
25+
2526
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient/mock"
2627

2728
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"

controllers/bounce_processes.go

+95-26
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ package controllers
2222

2323
import (
2424
"context"
25+
"errors"
2526
"fmt"
2627
"time"
2728

28-
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
29-
3029
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/buggify"
30+
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/coordination"
31+
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
3132

3233
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/restarts"
3334

@@ -38,8 +39,7 @@ import (
3839
"k8s.io/utils/pointer"
3940
)
4041

41-
// bounceProcesses provides a reconciliation step for bouncing fdbserver
42-
// processes.
42+
// bounceProcesses provides a reconciliation step for bouncing fdbserver processes.
4343
type bounceProcesses struct{}
4444

4545
// reconcile runs the reconciler's work.
@@ -64,16 +64,40 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
6464
}
6565
}
6666

67+
// Fetch the processes that are ready for restart for the current cluster, we use that information to determine which
68+
// processes must be added to the set.
69+
var processesReadyForRestart map[fdbv1beta2.ProcessGroupID]time.Time
70+
var processesPendingForRestart map[fdbv1beta2.ProcessGroupID]time.Time
71+
globalSynchronizationMode := cluster.GetSynchronizationMode() == fdbv1beta2.SynchronizationModeGlobal
72+
if globalSynchronizationMode {
73+
processesReadyForRestart, err = adminClient.GetReadyForRestart(cluster.Spec.ProcessGroupIDPrefix)
74+
if err != nil {
75+
return &requeue{curError: err}
76+
}
77+
78+
processesPendingForRestart, err = adminClient.GetPendingForRestart(cluster.Spec.ProcessGroupIDPrefix)
79+
if err != nil {
80+
return &requeue{curError: err}
81+
}
82+
}
83+
6784
currentMinimumUptime, addressMap, err := fdbstatus.GetMinimumUptimeAndAddressMap(logger, cluster, status, r.EnableRecoveryState)
6885
if err != nil {
6986
return &requeue{curError: err}
7087
}
7188

72-
addresses, req := getProcessesReadyForRestart(logger, cluster, addressMap)
89+
addresses, updatesReadyForRestart, updatesPendingForRestart, req := getProcessesReadyForRestart(logger, cluster, addressMap, processesReadyForRestart, processesPendingForRestart)
7390
if req != nil {
7491
return req
7592
}
7693

94+
if globalSynchronizationMode && len(updatesPendingForRestart) > 0 {
95+
err = adminClient.UpdatePendingForRestart(updatesPendingForRestart)
96+
if err != nil {
97+
return &requeue{curError: err}
98+
}
99+
}
100+
77101
// Only perform the check if the cluster controller must be restarted if the cluster was up long enough. This is an
78102
// additional safety guard to reduce the risk of successive restarts in cases where unidirectional partitions occur.
79103
if currentMinimumUptime > r.MinimumRequiredUptimeCCBounce.Seconds() {
@@ -118,6 +142,7 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
118142
return &requeue{curError: err}
119143
}
120144
}
145+
121146
version, err := fdbv1beta2.ParseFdbVersion(cluster.Spec.Version)
122147
if err != nil {
123148
return &requeue{curError: err}
@@ -137,9 +162,18 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
137162
}
138163
}
139164

140-
err = r.takeLock(logger, cluster, fmt.Sprintf("bouncing processes: %v", addresses))
141-
if err != nil {
142-
return &requeue{curError: err}
165+
if globalSynchronizationMode && len(updatesReadyForRestart) > 0 {
166+
err = adminClient.UpdateReadyForRestart(updatesReadyForRestart)
167+
if err != nil {
168+
return &requeue{curError: err}
169+
}
170+
}
171+
172+
if useLocks {
173+
lockErr := lockClient.TakeLock()
174+
if lockErr != nil {
175+
return &requeue{curError: lockErr, delayedRequeue: true}
176+
}
143177
}
144178

145179
if useLocks && upgrading {
@@ -153,6 +187,32 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
153187
}
154188
}
155189

190+
// When the cluster is being upgraded, we use the same synchronization mode as before.
191+
if globalSynchronizationMode && !upgrading {
192+
pendingForRestart, err := adminClient.GetPendingForRestart("")
193+
if err != nil {
194+
return &requeue{curError: err, delayedRequeue: true}
195+
}
196+
197+
readyForRestart, err := adminClient.GetReadyForRestart("")
198+
if err != nil {
199+
return &requeue{curError: err, delayedRequeue: true}
200+
}
201+
202+
err = coordination.AllProcessesReady(logger, pendingForRestart, readyForRestart, r.GlobalSynchronizationWaitDuration)
203+
if err != nil {
204+
waitTimeError := &coordination.WaitTimeError{}
205+
if errors.As(err, waitTimeError) {
206+
return &requeue{curError: err, delayedRequeue: true, delay: waitTimeError.GetWaitTime()}
207+
}
208+
209+
return &requeue{curError: err, delayedRequeue: true}
210+
}
211+
212+
addresses = coordination.GetAddressesFromStatus(logger, status, readyForRestart, false)
213+
logger.Info("Addresses from status", "addresses", addresses)
214+
}
215+
156216
filteredAddresses, removedAddresses := buggify.FilterIgnoredProcessGroups(cluster, addresses, status)
157217
if removedAddresses {
158218
addresses = filteredAddresses
@@ -169,8 +229,13 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
169229
// processes in the cluster.
170230
err = adminClient.KillProcessesForUpgrade(addresses)
171231
} else {
232+
clearErr := adminClient.ClearReadyForRestart()
233+
if clearErr != nil {
234+
logger.Info("Could not remove ready entries for restart, will continue with restart", "error", clearErr.Error())
235+
}
172236
err = adminClient.KillProcesses(addresses)
173237
}
238+
174239
if err != nil {
175240
return &requeue{curError: err}
176241
}
@@ -191,11 +256,13 @@ func (c bounceProcesses) reconcile(_ context.Context, r *FoundationDBClusterReco
191256

192257
// getProcessesReadyForRestart returns a slice of process addresses that can be restarted. If addresses are missing or not all processes
193258
// have the latest configuration this method will return a requeue struct with more details.
194-
func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, addressMap map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.ProcessAddress) ([]fdbv1beta2.ProcessAddress, *requeue) {
259+
func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, addressMap map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.ProcessAddress, processesReadyForRestart map[fdbv1beta2.ProcessGroupID]time.Time, processesPendingForRestart map[fdbv1beta2.ProcessGroupID]time.Time) ([]fdbv1beta2.ProcessAddress, map[fdbv1beta2.ProcessGroupID]fdbv1beta2.UpdateAction, map[fdbv1beta2.ProcessGroupID]fdbv1beta2.UpdateAction, *requeue) {
195260
addresses := make([]fdbv1beta2.ProcessAddress, 0, len(cluster.Status.ProcessGroups))
196261
allSynced := true
197262
versionIncompatibleUpgrade := cluster.IsBeingUpgradedWithVersionIncompatibleVersion()
198263
var missingAddress []fdbv1beta2.ProcessGroupID
264+
updatesReadyForRestart := map[fdbv1beta2.ProcessGroupID]fdbv1beta2.UpdateAction{}
265+
updatesPendingForRestart := map[fdbv1beta2.ProcessGroupID]fdbv1beta2.UpdateAction{}
199266

200267
filterConditions := restarts.GetFilterConditions(cluster)
201268
missingProcesses := map[fdbv1beta2.ProcessClass]int{}
@@ -224,24 +291,19 @@ func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.Foundat
224291
if versionIncompatibleUpgrade && processGroup.IsExcluded() {
225292
logger.Info("adding process group that is marked for exclusion to list of restarted processes", "processGroupID", processGroup.ProcessGroupID)
226293
addresses = append(addresses, addressMap[processGroup.ProcessGroupID]...)
227-
continue
228-
}
229-
}
294+
if _, ok := processesReadyForRestart[processGroup.ProcessGroupID]; !ok {
295+
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
296+
}
297+
if _, ok := processesPendingForRestart[processGroup.ProcessGroupID]; !ok {
298+
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
299+
}
230300

231-
// Ignore processes that are missing for more than 30 seconds e.mg. if the process is network partitioned.
232-
// This is required since the update status will not update the SidecarUnreachable setting if a process is
233-
// missing in the status.
234-
if missingTime := processGroup.GetConditionTime(fdbv1beta2.MissingProcesses); missingTime != nil {
235-
if time.Unix(*missingTime, 0).Add(cluster.GetIgnoreMissingProcessesSeconds()).Before(time.Now()) {
236-
logger.Info("ignore process group with missing process", "processGroupID", processGroup.ProcessGroupID)
237-
missingProcesses[processGroup.ProcessClass]++
238301
continue
239302
}
240303
}
241304

242-
// If a Pod is stuck in pending we have to ignore it, as the processes hosted by this Pod will not be running.
243-
if cluster.SkipProcessGroup(processGroup) {
244-
logger.Info("ignore process group with Pod stuck in pending", "processGroupID", processGroup.ProcessGroupID)
305+
// Check if the processes should be ignored.
306+
if restarts.ShouldBeIgnoredBecauseMissing(logger, cluster, processGroup) {
245307
missingProcesses[processGroup.ProcessClass]++
246308
continue
247309
}
@@ -258,20 +320,27 @@ func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.Foundat
258320
continue
259321
}
260322

323+
if _, ok := processesPendingForRestart[processGroup.ProcessGroupID]; !ok {
324+
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
325+
}
326+
261327
if addressMap[processGroup.ProcessGroupID] == nil {
262328
missingAddress = append(missingAddress, processGroup.ProcessGroupID)
263329
continue
264330
}
265331

266332
addresses = append(addresses, addressMap[processGroup.ProcessGroupID]...)
333+
if _, ok := processesReadyForRestart[processGroup.ProcessGroupID]; !ok {
334+
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
335+
}
267336
}
268337

269338
if len(missingAddress) > 0 {
270-
return nil, &requeue{message: fmt.Sprintf("could not find address for processes: %s", missingAddress), delayedRequeue: true}
339+
return nil, nil, updatesPendingForRestart, &requeue{message: fmt.Sprintf("could not find address for processes: %s", missingAddress), delayedRequeue: true}
271340
}
272341

273342
if !allSynced {
274-
return nil, &requeue{message: "Waiting for config map to sync to all pods", delayedRequeue: true}
343+
return nil, nil, updatesPendingForRestart, &requeue{message: "Waiting for config map to sync to all pods", delayedRequeue: true}
275344
}
276345

277346
// Only if the cluster is upgraded with an incompatible version we have to make sure that all processes are ready to be restarted.
@@ -280,15 +349,15 @@ func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.Foundat
280349
err := checkIfEnoughProcessesAreRunning(logger, cluster, len(addresses), missingProcesses, markedForRemoval)
281350
// If not all processes are ready to restart we will block the upgrade and delay it.
282351
if err != nil {
283-
return nil, &requeue{
352+
return nil, nil, nil, &requeue{
284353
message: err.Error(),
285354
delay: 5 * time.Second,
286355
delayedRequeue: true,
287356
}
288357
}
289358
}
290359

291-
return addresses, nil
360+
return addresses, updatesReadyForRestart, updatesPendingForRestart, nil
292361
}
293362

294363
// getUpgradeAddressesFromStatus will return the processes that can be upgraded and all the processes that are not ready to be upgraded.

0 commit comments

Comments
 (0)