Skip to content

Commit f722ac5

Browse files
Merge pull request #59 from kamil-holubicki/DISTMYSQL-418
DISTMYSQL-418: Introduce separate discovery queue for unhealthy instances
2 parents b5f9738 + e0484be commit f722ac5

File tree

8 files changed

+375
-16
lines changed

8 files changed

+375
-16
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Configuration: advanced discovery
2+
3+
The Orchestrator uses an internal queue to manage instances for discovery. When an instance is ready for discovery, it gets added to the queue. Discovery workers process the queue. The `DiscoveryMaxConcurrency` setting in a configuration file controls the number of workers. This setting determines how many discoveries can happen in parallel.
4+
5+
The Orchestrator uses this mechanism to periodically monitor all instances. `InstancePollSeconds` configuration parameter says how often the Orchestrator should refresh the information.
6+
7+
When there is a lot of inaccessible or unhealthy instances, the Orchestrator may lose the proper view of the cluster and be late with needed recovery actions. This is because discoveries of such instances may take a long time and finish with failure anyway, at the same time consuming workers from the discovery workers pool. Healthy instances wait in the queue and they are not checked in a timely manner.
8+
9+
To avoid this, Orchestrator can be configured to maintain a separate discovery queue for unhealthy instances. This queue is processed by a separate pool of workers. Additionally, an exponential time backoff mechanism can be applied for rechecking such instances.
10+
11+
Configuration example:
12+
```json
13+
{
14+
"DeadInstanceDiscoveryMaxConcurrency": 100,
15+
"DeadInstancePollSecondsMultiplyFactor": 1.5,
16+
"DeadInstancePollSecondsMax": 60,
17+
"DeadInstanceDiscoveryLogsEnabled": true
18+
}
19+
```
20+
21+
`DeadInstanceDiscoveryMaxConcurrency` (default: 0) - Determines the number of discovery workers dedicated to dead instances. If this pool size is grater than 0, the Orchestrator maintains a separate queue for dead instances.
22+
23+
`DeadInstancePollSecondsMultiplyFactor` (default: 1) - Floating point number, allowed values are >= 1. Determines how aggressive the backoff mechanism is. By default, when `DeadInstancePollSecondsMultiplyFactor = 1`, the instance is checked every `InstancePollSeconds` seconds. If the parameter value is greater than 1, every consecutive try `n` is done after the period calculated according to the formula:
24+
25+
dT(n) = InstancePollSeconds * DeadInstancePollSecondsMultiplyFactor ^ (n-1)
26+
27+
Example:
28+
29+
Let's use `D` as `DeadInstancePollSecondsMultiplyFactor`
30+
31+
f(1) = 1\
32+
f(2) = f(1) * D\
33+
f(3) = f(2) * D\
34+
f(4) = f(3) * D
35+
36+
That means:
37+
38+
f(4) = 1 * D * D * D = D^3
39+
40+
or in other words
41+
42+
f(n) = DeadInstancePollSecondsMultiplyFactor ^ (n-1)
43+
44+
so:
45+
46+
dT(n) = InstancePollSeconds * f(n)\
47+
dT(n) = InstancePollSeconds * DeadInstancePollSecondsMultiplyFactor ^ (n-1)
48+
49+
Note that `DeadInstanceDiscoveryMaxConcurrency` controls if the separate pool of discovery workers is created but has no impact on the backoff mechanism controlled by `DeadInstancePollSecondsMultiplyFactor`. It has the following implications:
50+
51+
1. `DeadInstanceDiscoveryMaxConcurrency > 0` and `DeadInstancePollSecondsMultiplyFactor > 1`:\
52+
The separate discovery queue for dead instances is created, and dead instances are checked by a dedicated pool of go workers, and the instance is checked with exponential backoff mechanism time
53+
2. `DeadInstanceDiscoveryMaxConcurrency = 0` and `DeadInstancePollSecondsMultiplyFactor > 1`:\
54+
No separate discovery queue for dead instances is created, and dead instances are checked by the same pool of go workers as healthy instances however, an exponential backoff mechanism is applied for dead instances
55+
3. `DeadInstanceDiscoveryMaxConcurrency > 0` and `DeadInstancePollSecondsMultiplyFactor = 1`:\
56+
A separate discovery queue for dead instances is created, and dead instances are checked by a dedicated pool of go workers. No exponential backoff mechanism is applied for dead instances
57+
4. `DeadInstanceDiscoveryMaxConcurrency = 0` and `DeadInstancePollSecondsMultiplyFactor = 1`:\
58+
There is no separate discovery queue for dead instances, no dedicated go workers, no backoff mechanism. This is the default working mode.
59+
60+
`DeadInstancePollSecondsMax` (default: 300) - Controls the maximum time for backoff mechanism. If the backoff calculation goes beyond this value, it is considered as saturated and stays at `DeadInstancePollSecondsMax`
61+
62+
## Diagnostics
63+
Orchestrator provides `debug/metrics` web endpoint for diagnostics.
64+
65+
`discoveries.dead_instances` - provides the number of instances currently registered as dead.\
66+
`discoveries.dead_instances_queue_length` - provides the current length of the queue dedicate for dead instances. Note this is valid only when `DeadInstanceDiscoveryMaxConcurrency > 0`, so when a separate queue is used. In other cases it is always zero.
67+
68+
Other diagnostics endpoints:
69+
70+
`api/discovery-queue-metrics-raw/:seconds` - provides the raw metrics for a given time for the `DEFAULT` discovery queue.\
71+
`api/discovery-queue-metrics-raw/:queue/:seconds` - provides the raw metrics for a given time for the supplied (`DEFAULT` or `DEADINSTANCES`) discovery queue.\
72+
`discovery-queue-metrics-aggregated/:seconds` - provides aggregated metrics for a given time for the `DEFAULT` discovery queue.\
73+
`discovery-queue-metrics-aggregated/:queue/:seconds` - provides aggregated metrics for a given time for the supplied (`DEFAULT` or `DEADINSTANCES`) discovery queue.
74+
75+
76+
Note that `DEADINSTANCES` queue is available only if `DeadInstanceDiscoveryMaxConcurrency > 0`
77+
78+
## Logging
79+
Logging of dead instances discovery process is controlled vial `DeadInstanceDiscoveryLogsEnabled` bool parameter. It is disabled by default.

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Use the following small steps to configure `orchestrator`:
1010

1111
- [Backend](configuration-backend.md)
1212
- [Discovery: basic](configuration-discovery-basic.md)
13+
- [Discovery: advanced](configuration-discovery-advanced.md)
1314
- [Discovery: resolving names](configuration-discovery-resolve.md)
1415
- [Discovery: classifying servers](configuration-discovery-classifying.md)
1516
- [Discovery: Pseudo-GTID](configuration-discovery-pseudo-gtid.md)

go/config/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ type Configuration struct {
142142
DiscoverByShowSlaveHosts bool // Attempt SHOW SLAVE HOSTS before PROCESSLIST
143143
UseSuperReadOnly bool // Should orchestrator super_read_only any time it sets read_only
144144
InstancePollSeconds uint // Number of seconds between instance reads
145+
DeadInstancePollSecondsMultiplyFactor float32 // InstancePoolSeconds increase factor for dead instances read time calculation
146+
DeadInstancePollSecondsMax uint // Maximum delay between dead instance read attempts
147+
DeadInstanceDiscoveryMaxConcurrency uint // Number of goroutines doing dead hosts discovery
148+
DeadInstanceDiscoveryLogsEnabled bool // Enable logs related to dead instances discoveries
145149
ReasonableInstanceCheckSeconds uint // Number of seconds an instance read is allowed to take before it is considered invalid, i.e. before LastCheckValid will be false
146150
InstanceWriteBufferSize int // Instance write buffer size (max number of instances to flush in one INSERT ODKU)
147151
BufferInstanceWrites bool // Set to 'true' for write-optimization on backend table (compromise: writes can be stale and overwrite non stale data)
@@ -332,6 +336,10 @@ func newConfiguration() *Configuration {
332336
DefaultInstancePort: 3306,
333337
TLSCacheTTLFactor: 100,
334338
InstancePollSeconds: 5,
339+
DeadInstancePollSecondsMultiplyFactor: 1,
340+
DeadInstancePollSecondsMax: 5 * 60,
341+
DeadInstanceDiscoveryMaxConcurrency: 0,
342+
DeadInstanceDiscoveryLogsEnabled: false,
335343
ReasonableInstanceCheckSeconds: 1,
336344
InstanceWriteBufferSize: 100,
337345
BufferInstanceWrites: false,
@@ -630,6 +638,13 @@ func (this *Configuration) postReadAdjustments() error {
630638
this.ReasonableLockedSemiSyncMasterSeconds = uint(this.ReasonableReplicationLagSeconds)
631639
}
632640

641+
if this.DeadInstancePollSecondsMultiplyFactor < 1 {
642+
return fmt.Errorf("DeadInstancePollSecondsMultiplyFactor can not be smaller than 1")
643+
}
644+
645+
if this.DeadInstancePollSecondsMax < this.InstancePollSeconds {
646+
return fmt.Errorf(("DeadInstancePollSecondsMax can not be smaller than InstancePollSeconds"))
647+
}
633648
return nil
634649
}
635650

go/discovery/queue.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ func StopMonitoring() {
6969
}
7070
}
7171

72+
func ReturnQueue(name string) *Queue {
73+
dcLock.Lock()
74+
defer dcLock.Unlock()
75+
if q, found := discoveryQueue[name]; found {
76+
return q
77+
}
78+
return nil
79+
}
80+
7281
// CreateOrReturnQueue allows for creation of a new discovery queue or
7382
// returning a pointer to an existing one given the name.
7483
func CreateOrReturnQueue(name string) *Queue {

go/http/api.go

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,39 +2395,80 @@ func (this *HttpAPI) DiscoveryMetricsAggregated(params martini.Params, r render.
23952395
r.JSON(http.StatusOK, aggregated)
23962396
}
23972397

2398-
// DiscoveryQueueMetricsRaw returns the raw queue metrics (active and
2399-
// queued values), data taken secondly for the last N seconds.
2400-
func (this *HttpAPI) DiscoveryQueueMetricsRaw(params martini.Params, r render.Render, req *http.Request, user auth.User) {
2398+
func (this *HttpAPI) discoveryQueueMetricsAggregatedCommon(params martini.Params, r render.Render, req *http.Request, user auth.User, queueName string) {
2399+
seconds, err := strconv.Atoi(params["seconds"])
2400+
log.Debugf("DiscoveryQueueMetricsAggregated: queue: %s, seconds: %d", queueName, seconds)
2401+
if err != nil {
2402+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics"})
2403+
return
2404+
}
2405+
2406+
queue := discovery.ReturnQueue(queueName)
2407+
if queue == nil {
2408+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics for unknown queue"})
2409+
return
2410+
}
2411+
aggregated := queue.AggregatedDiscoveryQueueMetrics(seconds)
2412+
log.Debugf("DiscoveryQueueMetricsAggregated data: %+v", aggregated)
2413+
2414+
r.JSON(http.StatusOK, aggregated)
2415+
}
2416+
2417+
func (this *HttpAPI) discoveryQueueMetricsRawCommon(params martini.Params, r render.Render, req *http.Request, user auth.User, queueName string) {
24012418
seconds, err := strconv.Atoi(params["seconds"])
24022419
log.Debugf("DiscoveryQueueMetricsRaw: seconds: %d", seconds)
24032420
if err != nil {
2404-
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics"})
2421+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue raw metrics"})
24052422
return
24062423
}
24072424

2408-
queue := discovery.CreateOrReturnQueue("DEFAULT")
2425+
queue := discovery.ReturnQueue(queueName)
2426+
if queue == nil {
2427+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics for unknown queue"})
2428+
return
2429+
}
24092430
metrics := queue.DiscoveryQueueMetrics(seconds)
24102431
log.Debugf("DiscoveryQueueMetricsRaw data: %+v", metrics)
24112432

24122433
r.JSON(http.StatusOK, metrics)
24132434
}
24142435

2436+
// DiscoveryQueueMetricsRaw returns the raw queue metrics (active and
2437+
// queued values), data taken secondly for the last N seconds.
2438+
func (this *HttpAPI) DiscoveryQueueMetricsRaw(params martini.Params, r render.Render, req *http.Request, user auth.User) {
2439+
this.discoveryQueueMetricsRawCommon(params, r, req, user, "DEFAULT")
2440+
}
2441+
24152442
// DiscoveryQueueMetricsAggregated returns a single value showing the metrics of the discovery queue over the last N seconds.
24162443
// This is expected to be called every 60 seconds (?) and the config setting of the retention period is currently hard-coded.
24172444
// See go/discovery/ for more information.
24182445
func (this *HttpAPI) DiscoveryQueueMetricsAggregated(params martini.Params, r render.Render, req *http.Request, user auth.User) {
2419-
seconds, err := strconv.Atoi(params["seconds"])
2420-
log.Debugf("DiscoveryQueueMetricsAggregated: seconds: %d", seconds)
2421-
if err != nil {
2422-
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics"})
2446+
this.discoveryQueueMetricsAggregatedCommon(params, r, req, user, "DEFAULT")
2447+
}
2448+
2449+
// DiscoveryQueueMetricsRaw2 returns the raw queue metrics (active and
2450+
// queued values), data taken secondly for the last N seconds.
2451+
func (this *HttpAPI) DiscoveryQueueMetricsRaw2(params martini.Params, r render.Render, req *http.Request, user auth.User) {
2452+
queue, found := params["queue"]
2453+
if !found {
2454+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue raw metrics"})
24232455
return
24242456
}
24252457

2426-
queue := discovery.CreateOrReturnQueue("DEFAULT")
2427-
aggregated := queue.AggregatedDiscoveryQueueMetrics(seconds)
2428-
log.Debugf("DiscoveryQueueMetricsAggregated data: %+v", aggregated)
2458+
this.discoveryQueueMetricsRawCommon(params, r, req, user, queue)
2459+
}
24292460

2430-
r.JSON(http.StatusOK, aggregated)
2461+
// DiscoveryQueueMetricsAggregated2 returns a single value showing the metrics of the discovery queue over the last N seconds.
2462+
// This is expected to be called every 60 seconds (?) and the config setting of the retention period is currently hard-coded.
2463+
// See go/discovery/ for more information.
2464+
func (this *HttpAPI) DiscoveryQueueMetricsAggregated2(params martini.Params, r render.Render, req *http.Request, user auth.User) {
2465+
queue, found := params["queue"]
2466+
if !found {
2467+
Respond(r, &APIResponse{Code: ERROR, Message: "Unable to generate discovery queue aggregated metrics"})
2468+
return
2469+
}
2470+
2471+
this.discoveryQueueMetricsAggregatedCommon(params, r, req, user, queue)
24312472
}
24322473

24332474
// BackendQueryMetricsRaw returns the raw backend query metrics
@@ -3982,6 +4023,8 @@ func (this *HttpAPI) RegisterRequests(m *martini.ClassicMartini) {
39824023
this.registerAPIRequest(m, "discovery-metrics-aggregated/:seconds", this.DiscoveryMetricsAggregated)
39834024
this.registerAPIRequest(m, "discovery-queue-metrics-raw/:seconds", this.DiscoveryQueueMetricsRaw)
39844025
this.registerAPIRequest(m, "discovery-queue-metrics-aggregated/:seconds", this.DiscoveryQueueMetricsAggregated)
4026+
this.registerAPIRequest(m, "discovery-queue-metrics-raw/:queue/:seconds", this.DiscoveryQueueMetricsRaw2)
4027+
this.registerAPIRequest(m, "discovery-queue-metrics-aggregated/:queue/:seconds", this.DiscoveryQueueMetricsAggregated2)
39854028
this.registerAPIRequest(m, "backend-query-metrics-raw/:seconds", this.BackendQueryMetricsRaw)
39864029
this.registerAPIRequest(m, "backend-query-metrics-aggregated/:seconds", this.BackendQueryMetricsAggregated)
39874030
this.registerAPIRequest(m, "write-buffer-metrics-raw/:seconds", this.WriteBufferMetricsRaw)

go/inst/dead_instance_filter.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package inst
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/openark/golib/log"
8+
"github.com/openark/orchestrator/go/config"
9+
"github.com/rcrowley/go-metrics"
10+
)
11+
12+
// The behavior depends on settings:
13+
// 1. DeadInstanceDiscoveryMaxConcurrency > 0 and DeadInstancePollSecondsMultiplyFactor > 1:
14+
// The separate discovery queue for dead instances is created and dead instances
15+
// are checked by dedicated pool of go workers
16+
// and the instance is checked with exponential backoff mechanism time
17+
// 2. DeadInstanceDiscoveryMaxConcurrency = 0 and DeadInstancePollSecondsMultiplyFactor > 1:
18+
// No separate discovery queue for dead instances is created and dead instances
19+
// are checked by the same pool of go workers as healthy instances, however
20+
// an exponential backoff mechanism is applied for dead instances
21+
// 3. DeadInstanceDiscoveryMaxConcurrency > 0 and DeadInstancePollSecondsMultiplyFactor = 1:
22+
// The separate discovery queue for dead instances is created and dead instances
23+
// are checked by dedicated pool of go workers. No exponential backoff mechanism
24+
// is applied for dead instances
25+
// 4. DeadInstanceDiscoveryMaxConcurrency = 0 and DeadInstancePollSecondsMultiplyFactor = 1:
26+
// No separate discovery queue for dead instances, no dedicated go workers,
27+
// no backoff mechanism. This is the default working mode.
28+
//
29+
// We register a dead instance always. It shouldn't be a big overhead,
30+
// and we will get the info about the dead instances count.
31+
32+
type deadInstance struct {
33+
DelayFactor float32
34+
NextCheckTime time.Time
35+
TryCnt int
36+
}
37+
38+
type deadInstancesFilter struct {
39+
deadInstances map[InstanceKey]deadInstance
40+
deadInstancesMutex sync.RWMutex
41+
}
42+
43+
var DeadInstancesFilter deadInstancesFilter
44+
45+
var deadInstancesCounter = metrics.NewCounter()
46+
47+
func init() {
48+
metrics.Register("discoveries.dead_instances", deadInstancesCounter)
49+
DeadInstancesFilter.deadInstances = make(map[InstanceKey]deadInstance)
50+
DeadInstancesFilter.deadInstancesMutex = sync.RWMutex{}
51+
}
52+
53+
// RegisterInstance registers a given instance in a dead instances cache.
54+
// Once the instance is registered its discovery can be delayed with exponential
55+
// backoff mechanism according to DeadInstancePollSecondsMultiplyFactor value.
56+
// During the registration, next desired check time is calculated basing on
57+
// the current delay factor, DeadInstancePollSecondsMultiplyFactor and
58+
// DeadInstancePollSecondsMax parameters.
59+
func (f *deadInstancesFilter) RegisterInstance(instanceKey *InstanceKey) {
60+
delayFactor := float32(1)
61+
previousTry := 0
62+
63+
f.deadInstancesMutex.Lock()
64+
defer f.deadInstancesMutex.Unlock()
65+
66+
instance, exists := f.deadInstances[*instanceKey]
67+
if exists {
68+
delayFactor = config.Config.DeadInstancePollSecondsMultiplyFactor * instance.DelayFactor
69+
previousTry = instance.TryCnt
70+
} else {
71+
deadInstancesCounter.Inc(1)
72+
}
73+
74+
maxDelay := time.Duration(config.Config.DeadInstancePollSecondsMax) * time.Second
75+
currentDelay := time.Duration(delayFactor*float32(config.Config.InstancePollSeconds)) * time.Second
76+
77+
// needed only for the debug log below
78+
delayFactorTmp := delayFactor
79+
80+
if currentDelay > maxDelay {
81+
// saturation
82+
currentDelay = maxDelay
83+
delayFactor = instance.DelayFactor // back to previous one
84+
}
85+
nextCheck := time.Now().Add(currentDelay)
86+
87+
instance = deadInstance{
88+
DelayFactor: delayFactor,
89+
NextCheckTime: nextCheck,
90+
TryCnt: previousTry + 1,
91+
}
92+
f.deadInstances[*instanceKey] = instance
93+
94+
if config.Config.DeadInstanceDiscoveryLogsEnabled {
95+
log.Debugf("Dead instance registered %v:%v. Iteration: %v. Current delay factor: %v (next check in %v (on %v))",
96+
instanceKey.Hostname, instanceKey.Port, instance.TryCnt, delayFactorTmp, currentDelay, instance.NextCheckTime)
97+
}
98+
}
99+
100+
// UnregisterInstace removes the given instance from dead instances cache.
101+
func (f *deadInstancesFilter) UnregisterInstance(instanceKey *InstanceKey) {
102+
f.deadInstancesMutex.Lock()
103+
defer f.deadInstancesMutex.Unlock()
104+
105+
instance, exists := f.deadInstances[*instanceKey]
106+
if exists {
107+
if config.Config.DeadInstanceDiscoveryLogsEnabled {
108+
log.Debugf("Dead instance unregistered: %v:%v after iteration: %v",
109+
instanceKey.Hostname, instanceKey.Port, instance.TryCnt)
110+
}
111+
deadInstancesCounter.Dec(1)
112+
delete(f.deadInstances, *instanceKey)
113+
}
114+
}
115+
116+
// InstanceRecheckNeeded checks if a given instance is registered in a dead instances
117+
// cache and if it is, is it time to rediscover it.
118+
// It returns two boolean values:
119+
// - The first boolean indicates if the instance is registered.
120+
// - The second boolean, indicates if it is time to rediscover the node.
121+
func (f *deadInstancesFilter) InstanceRecheckNeeded(instanceKey *InstanceKey) (bool, bool) {
122+
f.deadInstancesMutex.RLock()
123+
defer f.deadInstancesMutex.RUnlock()
124+
125+
instance, exists := f.deadInstances[*instanceKey]
126+
127+
if !exists {
128+
return exists, false
129+
}
130+
131+
if instance.NextCheckTime.After(time.Now()) {
132+
// recheck time still in the future
133+
return exists, false
134+
}
135+
136+
if config.Config.DeadInstanceDiscoveryLogsEnabled {
137+
log.Debugf("Dead instance recheck: %v:%v. Iteration: %v",
138+
instanceKey.Hostname, instanceKey.Port, instance.TryCnt)
139+
}
140+
return exists, true
141+
}

0 commit comments

Comments
 (0)